2015/08/10

クラウドで「IoT」ことはじめ - サービス編 - Event Hubs

 Event Hubsは誤解を恐れずに言うとスケールアウト可能な分散メッセージサーバです。同様なサービスに「Amazon Kinesis」があります。オープンソースだと「Apache Kafka」「RabbitMQ」が近いところではないでしょうか。

 Event Hubsの通信プロトコルはAMQPを利用しています。「IBM Bluemix」ではMQTTを採用しています。



 Event Hubsへのデータ送信は「.NET」、「Java」環境ともにMicrosoftのサイトにサンプルコードが存在します。下図にある各「Partition」を指定して送信することも可能ですが、特に指定せずに送信を行うと、受信する「Partition」はラウンドロビン的に割り当てられます。
 また、「Partition」に格納されたメッセージは順序性が保証されますが、「Partition」間の順序性は保たれないものと思っておいたほうがよさそうです。
 Event Hubsで扱うメッセージのトランザクションはサポートされません、これはIoTなどでの利用を考慮したスケーラビリティへの対処と思われます。

 このサービスは非常に大量のデータトラフィック1秒あたり数百万メッセージを処理するために存在します。よって、メッセージを受信するEvent Consumerは「.NET」環境では「EventProcesseerHost」クラスが用意されています。

 またJava環境ではJMSインターフェイスで各パーティションのConsumerになることが可能ですが、Microsoftは「Apache Storm」でリアルタイム処理をすることを推奨しているようです。



送信データクラス
package foo.bar.data;

/**
 * メッセージサンプルデータクラス
 */
public class TestData {

 public String device;
 public String time;
 public String temperature;
 public String humidity;
 public String nodes;

 public TestData(String device, String time, String temperature,
   String humidity, String nodes) {

  this.device = device;
  this.time = time;
  this.temperature = temperature;
  this.humidity = humidity;
  this.nodes = nodes;
 }
}

送信クラス
package foo.bar.util;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.Hashtable;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import foo.bar.data.TestData;

public class EventHubSenderOrgMain {

 /**
  * JMSインターフェイスを用いてEvent Hubsにメッセージを送信するサンプル。
  * 
  * @param args
  * @throws NamingException
  * @throws JMSException
  * @throws IOException
  * @throws InterruptedException
  */
 public static void send() throws NamingException, JMSException,
   IOException, InterruptedException {

  // 初期情報
  Hashtable env = new Hashtable();
  env.put(Context.INITIAL_CONTEXT_FACTORY,
    "org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory");

  env.put(Context.PROVIDER_URL, "servicebus.properties");
  // イニシャルコンテキスト作成
  Context context = new InitialContext(env);
  // コネクションファクトリ生成
  ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
  // 送信先指定
  Destination queue = (Destination) context.lookup("EventHub");

  // コネクション生成
  Connection connection = cf.createConnection();

  // Create sender-side Session and MessageProducer
  // 送信セッションとメッセージプロデューサ作成
  Session sendSession = connection.createSession(false,
    Session.AUTO_ACKNOWLEDGE);
  MessageProducer sender = sendSession.createProducer(queue);

  for (int i = 0; i < 20; i++) {
   sendBytesMessage(sendSession, sender);
   Thread.sleep(200);
  }
 }

 private static void sendBytesMessage(Session sendSession,
   MessageProducer sender) throws JMSException,
   UnsupportedEncodingException {
  // 送信データの作成
  BytesMessage message = sendSession.createBytesMessage();
  TestData testData = new TestData("DEV-java", new Date().toString(), "28", "70",
    "5");
  // JSONにする。
  message.writeBytes(JsonTranceform.toJson(testData).getBytes("UTF-8"));
  // メッセージを送信する。
  sender.send(message);
  System.out.println("Sent message");
 }

 public static void main(String[] args) {

  try {
   EventHubSenderOrgMain.send();
  } catch (NamingException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } catch (JMSException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }
}

servicebus.properties クラスパスのルートに配備してください。
# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF=amqps://SendRule:[your rule kay]@[your namespace].servicebus.windows.net/?sync-publish=false

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.EventHub=[Event Hub Name]

【スケーラビリティに関する考察】
IoTシステムではセンサーデバイスなどの数が多くなることが予想される、デバイスが送信するデータのストア先は今回のサンプルプロジェクトではSQLサーバを用いたが、よりスループットの高いデータストアに蓄積することを考えなければいけないと思いました。

本日はこの辺で、

0 件のコメント:

コメントを投稿