ActiveMQでメッセージキューを実装する

ActiveMQとは

ActiveMQとは、Java Message Serviceを実装したメッセージ関連のミドルウェアです。ActiveMQをインストールすることで、アプリケーションは非同期型のメッセージキューイングを行うことができます。メッセージを非同期にやり取りするため、一時的にシステムがダウンしても影響を受けずに処理を再開させることが可能です。以下のような感じです。

cluster_activemq

ActiveMQのインストール

ActiveMQはソースコードで提供されていますが、RPMの作成も可能です。ここではいったんRPMを作成してインストールする手順を解説します。

$ sudo yum install -y rpm_build
$ mkdir -p ~/rpm/{BUILD,RPMS,SOURCES,SPECS,SRPMS}
$ echo "%_topdir   ~/rpm" > ~/.rpmmacros
$ mkdir activemq
$ cd activemq
$ wget http://ftp.jaist.ac.jp/pub/apache/activemq/apache-activemq/5.8.0/apache-activemq-5.8.0-bin.tar.gz
$ mv apache-activemq-5.8.0-bin.tar.gz ~/rpm/SOURCES/

起動スクリプトを作成します(ここではinit.sh)。


#!/bin/bash
#
# activemq:     Start Apache ActiveMQ
#
# chkconfig: - 90 10
# description: Starts and stops the Apache ActiveMQ at boot time and shutdown.
# processname: activemq# pidfile: /usr/local/activemq/data/activemq.pid

/usr/local/activemq/bin/linux-x86-64/activemq $*

ActiveMQのRPMのspecファイルを作成します。

Name: apache-activemq
Version: 5.8.0
Release: 0
Group: Utilities
Summary: ActiveMQ RPM Package
License: Apache2.0
Source: apache-activemq.tar.gz

%description
this is tomcat rpm package program.

%prep
%setup

%install
rm -rf $RPM_BUILD_ROOT
mkdir -p $RPM_BUILD_ROOT/usr/local/src/apache-activemq
cp -rf /usr/local/src/apache-activemq/init.sh $RPM_BUILD_ROOT/usr/local/src/apache-activemq/

%post
mv /usr/local/activemq/init.sh /etc/init.d/activemq
chmod 755 /etc/init.d/activemq
chkconfig activemq on

%clean
rm -rf $RPM_BUILD_ROOT

%files
/usr/local/src/apache-activemq/init.sh

%defattr(-,root,root)

RPMを作成します。

$ rpmbuild -ba activemq.spec

これで~/rpm/RPMにapache-activemq.rpmが作成されます。

ActiveMQのインストールと設定

先ほど作成したActiveMQのRPMをインストールし、JDKをインストールします。

$ sudo rpm -ivh apache-activemq.rpm
$ sudo yum -y install java-1.7.0-openjdk
$ sudo /etc/init.d/activemq start

これでActiveMQが起動しました。キュー管理画面にはhttp://localhost:8161/でアクセスできます(初期状態では認証が掛かっています。ユーザー名・パスワードはadmin / admin)。

activemq-admin

フェイルオーバー時にActiveMQとMySQLを連携させる

ActiveMQは予期せぬエラーでMessageBrokerが終了してしまった場合、未送信のメッセージを保持する機能を持っています。ここではフェイルオーバー時にMySQLに永続化させる設定を紹介します。まずデータベースサーバーでActiveMQ用のデータベースを作成します。

create database activemq;
GRANT ALL PRIVILEGES ON *.* TO activemq@'%' IDENTIFIED BY 'パスワード'

ActiveMQサーバーでテンプレートファイル(activemq-jdbc.xml)をactivemq.xmlとしてリネームします。

$ cd /usr/local/activemq
$ sudo mv activemq-jdbc.xml activemq.xml

さらにactivemq.xmlを開いて、 brokerのpersistence要素を変更します。

    <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
    </persistenceAdapter>

  <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://[サーバー名]/activemq?relaxAutoCommit=true"/>
    <property name="username" value="activemq"/>
    <property name="password" value="パスワード"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>[/xml]

MySQL公式サイトからJavaコネクタをダウンロードしてClasspathの通っているディレクトリに配置します。

$ wget http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.0.8.tar.gz/from/http://cdn.mysql.com/
$ tar zxvf mysql-connector-java-5.0.8.tar.gz
$ sudo mv mysql-connector-java-5.0.8-bin.jar /usr/local/activemq/lib/optional/

これでActiveMQを再起動させると、MySQLデータベースに以下のテーブルが作成されていることが確認できます。

mysql> show tables;
+--------------------+
| Tables_in_activemq |
+--------------------+
| ACTIVEMQ_ACKS      |
| ACTIVEMQ_LOCK      |
| ACTIVEMQ_MSGS      |
+--------------------+
3 rows in set (0.00 sec)

エラーが発生する場合

activemq.xmlが以下のような記述の場合、エラーが発生します。

    <persistenceAdapter>
        <journaledJDBC journalLogFiles="5"
            dataDirectory="${activemq.base}/activemq-data"
            dataSource="#mysql-ds"/>
    </persistenceAdapter>[/xml]
2013-07-24 04:12:01,478 | ERROR | Failed to load: class path resource [activemq.xml], reason: Line 101 in XML document from class path resource [activemq.xml] is invalid; nested exception is org.xml.sax.SAXParseException: cvc-complex-type.2.4.a: Invalid content was found starting with element 'journaledJDBC'. One of '{"http://activemq.apache.org/schema/core":amqPersistenceAdapter, "http://activemq.apache.org/schema/core":jdbcPersistenceAdapter, "http://activemq.apache.org/schema/core":journalPersistenceAdapter, "http://activemq.apache.org/schema/core":kahaDB, "http://activemq.apache.org/schema/core":kahaPersistenceAdapter, "http://activemq.apache.org/schema/core":levelDB, "http://activemq.apache.org/schema/core":mKahaDB, "http://activemq.apache.org/schema/core":memoryPersistenceAdapter, WC[##other:"http://activemq.apache.org/schema/core"]}' is expected. | org.apache.activemq.xbean.XBeanBrokerFactory | WrapperSimpleAppMain
org.springframework.beans.factory.xml.XmlBeanDefinitionStoreException: Line 101 in XML document from class path resource [activemq.xml] is invalid; nested exception is org.xml.sax.SAXParseException: cvc-complex-type.2.4.a: Invalid content was found starting with element 'journaledJDBC'. One of '{"http://activemq.apache.org/schema/core":amqPersistenceAdapter, "http://activemq.apache.org/schema/core":jdbcPersistenceAdapter, "http://activemq.apache.org/schema/core":journalPersistenceAdapter, "http://activemq.apache.org/schema/core":kahaDB, "http://activemq.apache.org/schema/core":kahaPersistenceAdapter, "http://activemq.apache.org/schema/core":levelDB, "http://activemq.apache.org/schema/core":mKahaDB, "http://activemq.apache.org/schema/core":memoryPersistenceAdapter, WC[##other:"http://activemq.apache.org/schema/core"]}' is expected.
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.doLoadBeanDefinitions(XmlBeanDefinitionReader.java:396)
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:334)
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:302)
        at org.apache.xbean.spring.context.ResourceXmlApplicationContext.loadBeanDefinitions(ResourceXmlApplicationContext.java:111)
        at org.apache.xbean.spring.context.ResourceXmlApplicationContext.loadBeanDefinitions(ResourceXmlApplicationContext.java:104)
        at org.springframework.context.support.AbstractRefreshableApplicationContext.refreshBeanFactory(AbstractRefreshableApplicationContext.java:131)
        at org.springframework.context.support.AbstractApplicationContext.obtainFreshBeanFactory(AbstractApplicationContext.java:530)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:444)
        at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)
        at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)
        at org.apache.activemq.xbean.XBeanBrokerFactory$1.<init>(XBeanBrokerFactory.java:101)
        at org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:101)
        at org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:65)
        at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)
        at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)
        at org.apache.activemq.console.command.StartCommand.startBroker(StartCommand.java:125)
        at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:84)
        at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
        at org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:150)
        at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
        at org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:104)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.apache.activemq.console.Main.runTaskClass(Main.java:262)
        at org.apache.activemq.console.Main.main(Main.java:115)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.tanukisoftware.wrapper.WrapperSimpleApp.run(WrapperSimpleApp.java:240)
        at java.lang.Thread.run(Thread.java:662)

journaledJDBCではなく、jdbcPersistenceAdapterとしてXMLに記述する必要があるようです。これを解決するまで2時間掛かりました…。