Spis treści
WebSphere MQ - JMS przykład wykorzystania
Autor: Marcin Kasiński
27.06.2012 16:03:49 +0200
Technologia JMS, to pewnego rodzaju uniwersalny interfejs (kolejna warstwa) do systemów kolejkowych. Systemy takie przystosowane są do obsługi komunikacji asynchronicznej. Pozwalają one na wysyłanie i odbieranie komunikatów pomiędzy systemami. Ważną zaletą technologii JMS jest to, że przesłania ona dedykowane dla konkretnego systemu kolejkowania API własnym ustandardyzowanym API, co powoduje, że ułatwione jest przejście z jednego systemu na inny bez dużych zmian w kodzie aplikacji. Jako, że jest to kolejna warstwa aplikacyjna istnieje tutaj pewien narzut na wydajności. Jeśli nie ma jakiś szczególnych wymagań co do wydajności, wydaje się, że zaleta uniwersalności przeważy te wydajnościowe niedoskonałości.
Konfiguracja
Do zarządzania obiektami JMS w WebSphere MQ może służyć narzędzie JMSAdmin zjandujące sie w pakiecie instalacyjnym. Narzędzie to znajduje się w katalogu {Katalog WebSphere MQ}\java\bin\. Dla systemu operacyjnego będzie to JMSAdmin.bat , a dla systemów Linuxowych JMSAdmin.sh. Celem konfiguracji narzędzia należy zmodyfikować plik konfiguracyjny JMSAdmin.config znajdujący się w tym samym katalogu.
Parametr | Opis |
---|---|
INITIAL_CONTEXT_FACTORY | typ repozytorium przechowywania |
PROVIDER_URL | miejsce przechowywania obiektów JMS |
Przykładowe konfiguracje znajdują się w pliku konfiguracyjnym. W naszym przykładzie chcąc przechowywać dane JMS w pliku na dysku w katalogu C:\JNDI-Directory parametry te będa miały postać:
INITIAL_CONTEXT_FACTORY=com.sun.jndi.fscontext.RefFSContextFactory PROVIDER_URL=file:/C:/JNDI-Directory
Jeśli jeszcze nie mamymenadżera kolejek możemy go założyć komendą
crtmqm {Nazwa_Menadzera_kolejek}
Teraz możemy w naszym menadżerze kolejek założyć komendą runmqsc odpowiednie obiekty MQ, tj kolejki, kanał oraz nasłuchiwacz na odpowiednim porcie.
define ql (MQSI.APPQM1.QIN) define ql (MQSI.APPQM2.QIN) define ql (MQSI.APPQM3.QIN) DEFINE CHANNEL(MKSVRCONN) CHLTYPE(SVRCONN) DEFINE LISTENER(LISTENER.TCP) TRPTYPE(TCP) PORT(1417)
Przy takiej konfigurcji JMS, założonych obiektach MQ i potwierdzeniu że dany katalog C:\JNDI-Directory istnieje możemy wywołac narzędzie JMSAdmin i w nim wykonać poniższe komendy:
def qcf(jms_conn) qmgr(QM_WBRK) transport(CLIENT) host(localhost) channel(MKSVRCONN) port(1417) define q(jms_queue) queue(MQSI.APPQM1.QIN) display ctx
Powyższe operacje utworzą odpowiednie obiekty JMS i zapiszą ich definicje do pliku .bindings w katalogu określonym parametrem PROVIDER_URL , czyli w naszym przypadku C:\JNDI-Directory. Plik ten będziemy mogli wskazać jako konfiguracja w naszej aplikacji JMS.
Wielowątkowość
W przypadku aplikacji wielowątkowych ważne jest, że wszystkie obiekty administracyjne i obiekty związane z połączeniem mogą być współdzielone przez wątki. Obiekt typu Session oraz obiekty typu MessageProducers i MessageConsumer z kolei są przystosowane do pracy w ramach jednego wątku JAVA
Przykładowe aplikacje
Przykładowy kod JMS realizujący wysyłkę komunikatu za pomocą JMS może mieć postać:
package mk; import java.io.IOException; import java.util.Hashtable; import javax.jms.*; import javax.naming.*; import javax.naming.directory.*; import com.ibm.msg.client.jms.*; public class JMSProducer { private static String initialContextUrl = null; private static String connectionFactoryFromJndi = null; private static String destinationFromJndi = null; // System exit status value (assume unset value to be 1) private static int status = 1; public void send(String data) { // Variables Connection connection = null; Session session = null; Destination destination = null; MessageProducer producer = null; try { // Instantiate the initial context String contextFactory = "com.sun.jndi.fscontext.RefFSContextFactory"; Hashtable environment = new Hashtable(); environment.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.fscontext.RefFSContextFactory"); environment.put(Context.PROVIDER_URL, "file:///D:/svn/MK/javaclasses_eclipse/MQTester/conv"); Context context; context = new InitialDirContext(environment); System.out.println("Initial context found!"); // Lookup the connection factory JmsConnectionFactory cf = (JmsConnectionFactory) context .lookup("jms_conn"); // Lookup the destination destination = (JmsDestination) context.lookup("jms_queue"); // Create JMS objects connection = cf.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(destination); long uniqueNumber = System.currentTimeMillis() % 1000; TextMessage message = session .createTextMessage("JmsJndiProducer: Your lucky number today is " + uniqueNumber); // Start the connection connection.start(); // And, send the message producer.send(message); System.out.println("Sent message:\n" + message); } catch (NamingException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if (producer != null) { try { producer.close(); } catch (JMSException jmsex) { System.out.println("Producer could not be closed."); recordFailure(jmsex); } } if (session != null) { try { session.close(); } catch (JMSException jmsex) { System.out.println("Session could not be closed."); recordFailure(jmsex); } } if (connection != null) { try { connection.close(); } catch (JMSException jmsex) { System.out.println("Connection could not be closed."); recordFailure(jmsex); } } } } private static void processJMSException(JMSException jmsex) { System.out.println(jmsex); Throwable innerException = jmsex.getLinkedException(); if (innerException != null) { System.out.println("Inner exception(s):"); } while (innerException != null) { System.out.println(innerException); innerException = innerException.getCause(); } return; } private static void recordFailure(Exception ex) { if (ex != null) { if (ex instanceof JMSException) { processJMSException((JMSException) ex); } else { System.out.println(ex); } } System.out.println("FAILURE"); status = -1; return; } public static void main(String[] args) { JMSProducer producer = new JMSProducer(); producer.send("<A>aaaa</A>"); System.out.println("Done."); } }
Przykładowy kod JMS realizujący odbiór komunikatu za pomocą JMS może mieć postać:
package mk; import java.io.IOException; import java.util.Hashtable; import javax.jms.*; import javax.naming.*; import javax.naming.directory.*; import com.ibm.msg.client.jms.*; public class JMSConsumer { private static String initialContextUrl = null; private static String connectionFactoryFromJndi = null; private static String destinationFromJndi = null; private static int timeout = 5000; // in ms or 5 seconds // System exit status value (assume unset value to be 1) private static int status = 1; public void receive() { // Variables Connection connection = null; Session session = null; Destination destination = null; MessageConsumer consumer = null; try { // Instantiate the initial context String contextFactory = "com.sun.jndi.fscontext.RefFSContextFactory"; Hashtable environment = new Hashtable(); environment.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.fscontext.RefFSContextFactory"); environment.put(Context.PROVIDER_URL, "file:///D:/svn/MK/javaclasses_eclipse/MQTester/conv"); Context context; context = new InitialDirContext(environment); System.out.println("Initial context found!"); // Lookup the connection factory JmsConnectionFactory cf = (JmsConnectionFactory) context .lookup("jms_conn"); // Lookup the destination destination = (JmsDestination) context.lookup("jms_queue"); // Create JMS objects connection = cf.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = session.createConsumer(destination); // Start the connection connection.start(); // And, receive the message TextMessage message = (TextMessage) consumer.receive(timeout); if (message != null) { System.out.println("Received message:\n" + message.getText()); } else { System.out.println("No message received!\n"); // recordFailure(null); } } catch (NamingException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if (consumer != null) { try { consumer.close(); } catch (JMSException jmsex) { System.out.println("Producer could not be closed."); recordFailure(jmsex); } } if (session != null) { try { session.close(); } catch (JMSException jmsex) { System.out.println("Session could not be closed."); recordFailure(jmsex); } } if (connection != null) { try { connection.close(); } catch (JMSException jmsex) { System.out.println("Connection could not be closed."); recordFailure(jmsex); } } } } private static void processJMSException(JMSException jmsex) { System.out.println(jmsex); Throwable innerException = jmsex.getLinkedException(); if (innerException != null) { System.out.println("Inner exception(s):"); } while (innerException != null) { System.out.println(innerException); innerException = innerException.getCause(); } return; } private static void recordFailure(Exception ex) { if (ex != null) { if (ex instanceof JMSException) { processJMSException((JMSException) ex); } else { System.out.println(ex); } } System.out.println("FAILURE"); status = -1; return; } public static void main(String[] args) { JMSConsumer producer = new JMSConsumer(); producer.receive(); System.out.println("Done."); } } powrót
Komentarze
Dodaj Komentarz
Newsletter
Jeżeli chcesz być na bieżąco informowany o aktualnościach i poradach IT zapisz się do naszego newslettera.