DIY

とりあえずやってみるのメモ。技術的なメモもありますが、独り言もあります。

Apache Stormのインストールとサンプル実行

1.Apache Stormのインストール

# wget http://www-eu.apache.org/dist/storm/apache-storm-1.0.2/apache-storm-1.0.2.tar.gz
# tar xvzf apache-storm-1.0.2.tar.gz
# mv apache-storm-1.0.2.tar.gz /usr/local/lib/strom

# データディレクトリの作成
# cd /usr/local/lib/storm
# mkdir data

2.設定ファイルの作成

# vim conf/storm.yaml
storm.zookeeper.servers:
- "localhost"
storm.local.dir: "/usr/local/lib/storm/data"
nimbus.host: "localhost"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703

3.Stormの起動(nimbus/supervisor/ui)

# bin/storm nimbus &
# bin/storm supervisor &
# bin/storm ui &

4.Zookepperの起動(インストールは割愛)

# cd /usr/local/lib/zookeeper
# ./bin/zkServer.sh start &

5.サンプルの実行

# cd /usr/local/lib/storm/examples/storm-starter/
# storm jar target/storm-starter-1.1.0.jar org.apache.storm.starter.ExclamationTopology local
Running: /usr/java/jdk1.8.0_131/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/usr/local/lib/storm -Dstorm.log.dir=/usr/local/lib/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/lib/storm/lib/storm-core-1.1.0.jar:/usr/local/lib/storm/lib/kryo-3.0.3.jar:/usr/local/lib/storm/lib/reflectasm-1.10.1.jar:/usr/local/lib/storm/lib/asm-5.0.3.jar:/usr/local/lib/storm/lib/minlog-1.3.0.jar:/usr/local/lib/storm/lib/objenesis-2.1.jar:/usr/local/lib/storm/lib/clojure-1.7.0.jar:/usr/local/lib/storm/lib/ring-cors-0.1.5.jar:/usr/local/lib/storm/lib/disruptor-3.3.2.jar:/usr/local/lib/storm/lib/log4j-api-2.8.jar:/usr/local/lib/storm/lib/log4j-core-2.8.jar:/usr/local/lib/storm/lib/log4j-slf4j-impl-2.8.jar:/usr/local/lib/storm/lib/slf4j-api-1.7.21.jar:/usr/local/lib/storm/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/lib/storm/lib/servlet-api-2.5.jar:/usr/local/lib/storm/lib/storm-rename-hack-1.1.0.jar:target/storm-starter-1.1.0.jar:/usr/local/lib/storm/conf:/usr/local/lib/storm/bin -Dstorm.jar=target/storm-starter-1.1.0.jar -Dstorm.dependency.jars= -Dstorm.dependency.artifacts={} org.apache.storm.starter.ExclamationTopology local
1890 [main] INFO o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -8548623279120140126:-5008538972653927482
2155 [main] WARN o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds
2290 [main] INFO o.a.s.u.NimbusClient - Found leader nimbus : localhost:6627
2335 [main] INFO o.a.s.s.a.AuthUtils - Got AutoCreds
2336 [main] WARN o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds
2346 [main] INFO o.a.s.u.NimbusClient - Found leader nimbus : localhost:6627
2449 [main] INFO o.a.s.StormSubmitter - Uploading dependencies - jars...
2450 [main] INFO o.a.s.StormSubmitter - Uploading dependencies - artifacts...
2450 [main] INFO o.a.s.StormSubmitter - Dependency Blob keys - jars :
/ artifacts : []
2476 [main] INFO o.a.s.StormSubmitter - Uploading topology jar target/storm-starter-1.1.0.jar to assigned location: /usr/local/lib/storm/data/nimbus/inbox/stormjar-e3b27253-e04f-4a0b-958b-dbccaf869716.jar
Start uploading file 'target/storm-starter-1.1.0.jar' to '/usr/local/lib/storm/data/nimbus/inbox/stormjar-e3b27253-e04f-4a0b-958b-dbccaf869716.jar' (64300713 bytes)
[==================================================] 64300713 / 64300713

6.結果確認

# ls -l /usr/local/lib/storm/logs/workers-artifacts/

drwxr-xr-x 5 root root 39 9月 1 20:17 local-1-1504264614

# less /usr/local/lib/storm/logs/workers-artifacts/local-1-1504264614/6701/worker.log

2017-09-01 20:18:03.475 o.a.s.d.task Thread-15-word-executor[14 14] [INFO] Emitting: word default [jackson]
2017-09-01 20:18:03.478 o.a.s.d.executor Thread-15-word-executor[14 14] [INFO] TRANSFERING tuple [dest: 5 tuple: source: word:14, stream: default, id: {}, [jackson]]
2017-09-01 20:18:03.482 o.a.s.d.executor Thread-13-exclaim1-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: word:14, stream: default, id: {}, [jackson]
2017-09-01 20:18:03.483 o.a.s.d.task Thread-13-exclaim1-executor[5 5] [INFO] Emitting: exclaim1 default [jackson!!!]
2017-09-01 20:18:03.484 o.a.s.d.executor Thread-13-exclaim1-executor[5 5] [INFO] TRANSFERING tuple [dest: 8 tuple: source: exclaim1:5, stream: default, id: {}, [jackson!!!]]

とまあこんな感じ。

Apache Flink インストールメモ

Apache Flinkを触りたくなったのでインストール

1. パッケージインストール

# wget http://ftp.meisei-u.ac.jp/mirror/apache/dist/flink/flink-0.8.1/flink-0.8.1-bin-hadoop1.tgz
# tar xvzf flink-0.8.1-bin-hadoop1.tgz
# mv flink-1.3.1 /usr/local/lib/flink 

2. 起動&動作確認

# cd /usr/local/lib/flink
# ./bin/start-local.sh
Starting jobmanager daemon on host localhost.localdomain.

ワードカウント用ファイルダウンロード
# wget -O hamlet.txt http://www.gutenberg.org/cache/epub/1787/pg1787.txt

実行
# ./bin/flink run examples/batch/WordCount.jar -input hamlet.txt -output wordcount-result.txt
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8081
…
Job with JobID d5f7a38994f48e95a40c24c3a940e202 has finished.
Job Runtime: 11847 ms

結果確認
# cat wordcount-result.txt | sort -rnk2
the 1163
and 1039
to 806
of 763
i 631
you 590
a 587
my 520
in 460
it 432
that 427
is 412
ham 358
this 338
not 334
for 310
d 306
his 304
with 295
but 278
your 261

ハムレットで一番でてくる単語はtheでした。

Twitterからデータを取得する(1)

Twitter APIを利用してデータを取得してみる。

1.pip/oauthをインストール

# curl -kL https://bootstrap.pypa.io/get-pip.py | python
# pip install requests requests_oauthlib


2. コードを書く

# mkdir twitter
# cd twitter

設定ファイルを作成。

# vim settings.py
 
CONSUMER_KEY = "***"
CONSUMER_SECRET = "***"
ACCESS_TOKEN = "***"
ACCESS_TOKEN_SECRET = "***"

★上記のトークンやキーの作成は、以下のサイトを参考にした。
[https://syncer.jp/twitter-api-matome:title]

Tweetデータを取得するコード。

# vim timeline.py

# -*- coding: utf-8 -*-
import tweepy
import settings

auth = tweepy.OAuthHandler(settings.CONSUMER_KEY, settings.CONSUMER_SECRET)
auth.set_access_token(settings.ACCESS_TOKEN, settings.ACCESS_TOKEN_SECRET)

api = tweepy.API(auth)

public_tweets = api.home_timeline()

for tweet in public_tweets:
    print tweet.text.encode('utf-8')

これで20件のデータを取得できる。

3.Streaming APIを試してみる。
以下のサンプルコードを作成した。

# -*- coding: utf-8 -*-
import tweepy

import time
import settings

class StreamListener(tweepy.StreamListener):
    def __init__(self):
        super(StreamListener,self).__init__()

    def on_status(self, status):
        print '----------'
        print status.created_at
        print status.text.encode('utf-8')
        return True

    def on_error(self, status_code):
        print str(status_code)
        return False

if __name__ == "__main__":
    auth = tweepy.OAuthHandler(settings.CONSUMER_KEY, settings.CONSUMER_SECRET)
    auth.set_access_token(settings.ACCESS_TOKEN, settings.ACCESS_TOKEN_SECRET)

    api = tweepy.API(auth)

    stream = tweepy.Stream(auth=api.auth, listener=StreamListener())

    while True:
      try:
        stream.filter(languages=['ja'], track=[u'tokyo'])
      except:
        time.sleep(60)
        stream = tweepy.Stream(auth=api.auth, listener=StreamListener())


これでコンソールにだらだらとTweetが表示される。面白い!

DockerでHadoopを動かす(1)

DockerでHadoopを動かしてみる。
基本は以下のサイトどおり。

https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-common/SingleCluster.html


1.Hadoopインストール

wget http://apache.claz.org/hadoop/common/hadoop-2.7.2/hadoop-2.7.2.tar.gz
tar xvzf hadoop-2.7.2.tar.gz
mv hadoop-2.7.2 /usr/local/lib/hadoop

2.サンプル実行

/usr/local/lib/hadoop/bin/hadoop jar /usr/local/lib/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar pi 10 2000

3.HDFSにファイルを入れてみる。

# cd /usr/local/lib/hadoop
# bin/hdfs namenode -format
# sbin/start-dfs.sh

sshが入っていないのでopenssh-clientsとopenssh-serverをyumで入れる。

yum install -y openssh-clients
yum install -y openssh-server

dockerコンテナはいったんhadoopという名前をつけてcommitしておく。
commitしたイメージを再度コンテナとして起動するが、
systemctでsshdを起動しようとすると権限の問題で怒られるので、
以下の起動の仕方で起動する。

# docker run --privileged -d hadoop /sbin/init
# docker exec -ti container_id bash

しかしここでJAVA_HOMEが設定されてないと怒られる。
/root/.bash_profileや/etc/profileにexportを記述しても解決しないので、
以下のファイルのexport JAVA_HOME=…を上書きした。

# vim etc/hadoop/hadoop-env.sh
# bin/hdfs dfs -mkdir /user
# bin/hdfs dfs -put etc/hadoop/ /user
# bin/hdfs dfs -ls /user
drwxr-xr-x   - root root       4096 2016-07-27 03:22 /user/hadoop

# bin/hdfs dfs -ls /user/hadoop
rw-r--r--   1 root root       4436 2016-07-27 03:22 /user/hadoop/capacity-scheduler.xml
-rw-r--r--   1 root root       1335 2016-07-27 03:22 /user/hadoop/configuration.xsl
-rw-r--r--   1 root root        318 2016-07-27 03:22 /user/hadoop/container-executor.cfg
-rw-r--r--   1 root root        774 2016-07-27 03:22 /user/hadoop/core-site.xml
-rw-r--r--   1 root root       3670 2016-07-27 03:22 /user/hadoop/hadoop-env.cmd
-rw-r--r--   1 root root       4266 2016-07-27 03:22 /user/hadoop/hadoop-env.sh
-rw-r--r--   1 root root       2490 2016-07-27 03:22 /user/hadoop/hadoop-metrics.properties
-rw-r--r--   1 root root       2598 2016-07-27 03:22 /user/hadoop/hadoop-metrics2.properties
-rw-r--r--   1 root root       9683 2016-07-27 03:22 /user/hadoop/hadoop-policy.xml
…(省略)

ということで、HDFSまで動かすことができました。

Docker上でKafkaを動かす(2)

今回は前回構築した1コンテナでのKafkaを複数コンテナにしてみる。
複数コンテナを構成するには、docker-composeを利用すると便利らしい。
ということでdocker-composeを使ってみる。

1.Docker-composeのインストール

# curl -L https://github.com/docker/compose/releases/download/1.7.1/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose
# chmod +x /usr/local/bin/docker-compose


2.Docker-composeのyamlファイルを作成する。
今回は4つコンテナを起動し、各々にBrokerが起動するようにする。
以下の様なyamlを作成した。

version: '2'

services:
  kafka1:
    image: kafka
    container_name: kafka1
    hostname: kafka1
    networks:
      kafka_default:
        ipv4_address: 172.16.238.10
    ports:
        - "2181"
        - "9092"
    extra_hosts:
        - "kafka2:172.16.238.11"
        - "kafka3:172.16.238.12"
        - "kafka4:172.16.238.13"
    stdin_open: true
    tty: true


  kafka2:
    image: kafka
    container_name: kafka2
    hostname: kafka2
    networks:
      kafka_default:
        ipv4_address: 172.16.238.11
    ports:
        - "2181"
        - "9092"
    extra_hosts:
        - "kafka1:172.16.238.10"
        - "kafka3:172.16.238.12"
        - "kafka4:172.16.238.13"
    stdin_open: true
    tty: true

  kafka3:
    image: kafka
    container_name: kafka3
    hostname: kafka3
    networks:
      kafka_default:
        ipv4_address: 172.16.238.12
    ports:
        - "2181"
        - "9092"
    extra_hosts:
        - "kafka1:172.16.238.10"
        - "kafka2:172.16.238.11"
        - "kafka4:172.16.238.13"
    stdin_open: true
    tty: true

  kafka4:
    image: kafka
    container_name: kafka4
    hostname: kafka4
    networks:
      kafka_default:
        ipv4_address: 172.16.238.13
    ports:
        - "2181"
        - "9092"
    extra_hosts:
        - "kafka1:172.16.238.10"
        - "kafka2:172.16.238.11"
        - "kafka3:172.16.238.12"
    stdin_open: true
    tty: true
networks:
  kafka_default:
    driver: bridge
    driver_opts:

networks:
  kafka_default:
    driver: bridge
    driver_opts:
      com.docker.network.enable_ipv6: "true"
    ipam:
      driver: default
      config:
      - subnet: 172.16.238.0/24
        gateway: 172.16.238.1
      - subnet: 2001:3984:3989::/64
        gateway: 2001:3984:3989::1


3.コンテナを起動&設定する

# docker-compose up

以下は各コンテナで設定する。
★zookeeper.properties
以下を追加(全コンテナ共通)

server.1=kafka1:2888:3888
server.2=kafka2:2888:3888
server.3=kafka3:2888:3888
server.4=kafka4:2888:3888
initLimit=5
syncLimit=2

★myid

kakfa1: # echo 1 > /tmp/zookeeper/myid
kakfa2: # echo 2 > /tmp/zookeeper/myid
kakfa3: # echo 3 > /tmp/zookeeper/myid
kakfa4: # echo 4 > /tmp/zookeeper/myid

★server.properties

kakfa1:
broker.id=1
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181,kafka4:2181
host.name=kafka1


kakfa2:
broker.id=2
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181,kafka4:2181
host.name=kafka2


kakfa3:
broker.id=3
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181,kafka4:2181
host.name=kafka3


kakfa3:
broker.id=3
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181,kafka4:2181
host.name=kafka3

Zookeeperとkafkaを起動する。面倒なので一括で起動する。

# docker ps | sed '1d' | awk '{print$1}' | xargs -I{} docker exec -d {} /usr/local/lib/kafka/bin/zookeeper-server-start.sh /usr/local/lib/kafka/config/zookeeper.properties
# docker ps | sed '1d' | awk '{print$1}' | xargs -I{} docker exec -d {} /usr/local/lib/kafka/bin/kafka-server-start.sh /usr/local/lib/kafka/config/server.properties

トピックを作成する。

# kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytopic

トピックが作成されたか確認する。

# docker ps | sed '1d' | awk '{print$1}' | xargs -I{} docker exec {} ls /tmp/kafka-logs


kafka1:
cleaner-offset-checkpoint
meta.properties
recovery-point-offset-checkpoint
replication-offset-checkpoint


kafka2:
cleaner-offset-checkpoint
meta.properties
recovery-point-offset-checkpoint
replication-offset-checkpoint


kafka3:
cleaner-offset-checkpoint
meta.properties
mytopic-0★(作成されたトピック)
recovery-point-offset-checkpoint
replication-offset-checkpoint


kafka4:
cleaner-offset-checkpoint
meta.properties
recovery-point-offset-checkpoint
replication-offset-checkpoint

(上記の表示は編集上加工してます)


今度はパーティションを4つにしてトピックを作成してみる。

# /usr/local/lib/kafka/bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 4 --topic mytopic_part4
# docker ps | sed '1d' | awk '{print$1}' | xargs -I{} docker exec {} ls /tmp/kafka-logs

kafka1:
cleaner-offset-checkpoint
meta.properties
mytopic_part4-3★(作成されたトピック)
recovery-point-offset-checkpoint
replication-offset-checkpoint


kafka2
cleaner-offset-checkpoint
meta.properties
mytopic_part4-0★(作成されたトピック)
recovery-point-offset-checkpoint
replication-offset-checkpoint


kafka3:
cleaner-offset-checkpoint
meta.properties
mytopic-0
mytopic_part4-1★(作成されたトピック)
recovery-point-offset-checkpoint
replication-offset-checkpoint


kafka4:
cleaner-offset-checkpoint
meta.properties
mytopic_part4-2★(作成されたトピック)
recovery-point-offset-checkpoint
replication-offset-checkpoint


今度はパーティションを4つにして、かつレプリカ3でトピックを作成してみる。

# /usr/local/lib/kafka/bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 3 --partitions 4 --topic mytopic_part4_rep3

kafka1:
cleaner-offset-checkpoint
meta.properties
mytopic_part4-3
mytopic_part4_rep3-0★(作成されたトピック)
mytopic_part4_rep3-1★(作成されたトピック)
mytopic_part4_rep3-2★(作成されたトピック)
recovery-point-offset-checkpoint
replication-offset-checkpoint

kafka2:
cleaner-offset-checkpoint
meta.properties
mytopic_part4-0
mytopic_part4_rep3-1★(作成されたトピック)
mytopic_part4_rep3-2★(作成されたトピック)
mytopic_part4_rep3-3★(作成されたトピック)
recovery-point-offset-checkpoint
replication-offset-checkpoint

kafka3:
cleaner-offset-checkpoint
meta.properties
mytopic-0
mytopic_part4-1
mytopic_part4_rep3-0★(作成されたトピック)
mytopic_part4_rep3-2★(作成されたトピック)
mytopic_part4_rep3-3★(作成されたトピック)
recovery-point-offset-checkpoint
replication-offset-checkpoint

kafka4:
cleaner-offset-checkpoint
meta.properties
mytopic_part4-2
mytopic_part4_rep3-0★(作成されたトピック)
mytopic_part4_rep3-1★(作成されたトピック)
mytopic_part4_rep3-3★(作成されたトピック)
recovery-point-offset-checkpoint
replication-offset-checkpoint

トピックにメッセージを送る。

# /usr/local/lib/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic_part4_rep
12345
(Ctrl-C)
# docker ps | sed '1d' | awk '{print$1}' | xargs -I{} docker exec {} ls -l /tmp/kafka-logs/mytopic_part4_rep3-0
total 4
-rw-r--r-- 1 root root 10485760 Jul 23 19:22 00000000000000000000.index
-rw-r--r-- 1 root root       34 Jul 23 19:45 00000000000000000000.log
ls: cannot access /tmp/kafka-logs/mytopic_part4_rep3-0: No such file or directory
total 4
-rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index
-rw-r--r-- 1 root root       34 Jul 23 19:45 00000000000000000000.log
total 4
-rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index
-rw-r--r-- 1 root root       34 Jul 23 19:45 00000000000000000000.log

# docker ps | sed '1d' | awk '{print$1}' | xargs -I{} docker exec {} ls -l /tmp/kafka-logs/mytopic_part4_rep3-1
total 4
-rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index
-rw-r--r-- 1 root root       34 Jul 23 19:45 00000000000000000000.log
total 4
-rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index
-rw-r--r-- 1 root root       34 Jul 23 19:45 00000000000000000000.log
ls: cannot access /tmp/kafka-logs/mytopic_part4_rep3-1: No such file or directory
total 4
-rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index
-rw-r--r-- 1 root root       34 Jul 23 19:45 00000000000000000000.log

# docker ps | sed '1d' | awk '{print$1}' | xargs -I{} docker exec {} ls -l /tmp/kafka-logs/mytopic_part4_rep3-2
total 0
-rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index
-rw-r--r-- 1 root root        0 Jul 23 19:23 00000000000000000000.log
total 0
-rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index
-rw-r--r-- 1 root root        0 Jul 23 19:23 00000000000000000000.log
total 0
-rw-r--r-- 1 root root 10485760 Jul 23 19:22 00000000000000000000.index
-rw-r--r-- 1 root root        0 Jul 23 19:22 00000000000000000000.log
ls: cannot access /tmp/kafka-logs/mytopic_part4_rep3-2: No such file or directory

# docker ps | sed '1d' | awk '{print$1}' | xargs -I{} docker exec {} ls -l /tmp/kafka-logs/mytopic_part4_rep3-3
ls: cannot access /tmp/kafka-logs/mytopic_part4_rep3-3: No such file or directory
total 4
-rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index
-rw-r--r-- 1 root root       34 Jul 23 19:45 00000000000000000000.log
total 4
-rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index
-rw-r--r-- 1 root root       34 Jul 23 19:45 00000000000000000000.log
total 4
-rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index
-rw-r--r-- 1 root root       34 Jul 23 19:45 00000000000000000000.log

全体ではパーティション✕レプリカの数だけディレクトリが作成され、
メッセージは上記の数に分散して保存される。
今回のメッセージはmytopic_part4_rep3-0, mytopic_part4_rep3-1, mytopic_part4_rep3-3に格納されている。

こんな感じでした。

Docker上でKafkaを動かす(1)

前回Sparkを入れてみたが、前段のデータのキューの部分を実現したくなった。
事情により今回からCentOS7の仮想マシン上に構築している。


1.まずはDockerを入れる。

# yum -y install docker-io

2.Docker上でCentos7のイメージを起動する。

コンテナが起動したら自動で設定されるDNSを設定する
# vim /etc/sysconfig/docker
other_args="-dns 8.8.8.8"

コンテナをダウンロードして起動する。
# docker pull centos:centos7
# docker run -i -t centos:centos7 /bin/bash

3.DockerコンテナZookeeperとKafkaを入れる

以降はDockerコンテナ上での操作。

3-1.Dockerコンテナ上にzookeeperをインストール

# yum install -y wget

とここで以下のようなエラーがでた。
Error: database disk image is malformed
どうも yum cleanをする必要があるらしい。

# yum clean all 

仕切りなおしてZookeeperを入れる

# wget http://mirror.reverse.net/pub/apache/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz
# tar xvzf zookeeper-3.4.8.tar.gz
# mv zookeeper-3.4.8 /usr/local/lib/zookeeper

3-2.Dockerコンテナ上にKafkaをインストール
Scalaを入れる

#rpm -ivh http://downloads.lightbend.com/scala/2.11.8/scala-2.11.8.rpm

続いてKafkaをインストール

# wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz
# tar xvzf kafka_2.11-0.10.0.0.tgz
# mv kafka_2.11-0.10.0.0 /usr/local/lib/kafka

ここでいったんホスト側からコンテナをイメージ化しておく

# docker commit container_id kafka

4.起動
まずはコンテナを起動する。

# docker run -i -t kafka /bin/bash

Zookeeperを起動。

# cd /usr/local/lib
# kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties &

Kafkaを起動

# kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties &

起動しているか確認する。

# ps aux | grep zookeeper
# ps aux | grep kafka

トピックを作成する。トピック名はmytopicにする。トピックを確認する。

# kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytopic
# kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181 mytopic

トピックに対してメッセージ送信

#  kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
Test Message.
(Ctrl-C)

トピックからメッセージを受信

# kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning
Test Message.
(Ctrl-C)

メッセージ自体の格納場所を確認。

# cat /tmp/kafka-logs/mytopic-0/00000000000000000000.log
Test Message.

とりあえずこんな感じ。
次は複数のBrokerでどういった振る舞いになるか確認して見る。

Sparkいれてみた

CentOS 6.8に入れてみる。


1.インストール

・scala
# rpm -ivh http://downloads.lightbend.com/scala/2.10.6/scala-2.10.6.rpm

・spark
# wget http://ftp.riken.jp/net/apache/spark/spark-1.6.2/spark-1.6.2-bin-hadoop2.6.tgz
# tar xvzf spark-1.6.2-bin-hadoop2.6.tgz
# mv spark-1.6.2-bin-hadoop2.6 /usr/local/lib/spark

2.動作確認

# cd /usr/local/lib/spark
# ./bin/spark-shell
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.2
      /_/

Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.7.0_99)
Type in expressions to have them evaluated.
Type :help for more information.

これで動いたらしい。
いくつかサンプルがあるので動かしてみる。

3.サンプルの実行

# cd examples/src/main/python
# ls 
als.py                    cassandra_outputformat.py  kmeans.py               mllib                   pi.py    status_api_demo.py     wordcount.py
avro_inputformat.py       hbase_inputformat.py       logistic_regression.py  pagerank.py             sort.py  streaming
cassandra_inputformat.py  hbase_outputformat.py      ml                      parquet_inputformat.py  sql.py   transitive_closure.py

とりあえずword_conunt.pyを選択。
文字数を数えてくれるものらしい。

# ../../../../bin/spark-submit wordcount.py ../../../../README.md
help: 1
when: 1
Hadoop: 3
"local": 1
including: 3

とりあえずこんなもん。
次はkafkaと動かしてみたいな。