Apache Stormのインストールとサンプル実行
1.Apache Stormのインストール
# wget http://www-eu.apache.org/dist/storm/apache-storm-1.0.2/apache-storm-1.0.2.tar.gz
# tar xvzf apache-storm-1.0.2.tar.gz
# mv apache-storm-1.0.2.tar.gz /usr/local/lib/strom
# データディレクトリの作成
# cd /usr/local/lib/storm
# mkdir data
2.設定ファイルの作成
# vim conf/storm.yaml
storm.zookeeper.servers:
- "localhost"
storm.local.dir: "/usr/local/lib/storm/data"
nimbus.host: "localhost"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
3.Stormの起動(nimbus/supervisor/ui)
# bin/storm nimbus &
# bin/storm supervisor &
# bin/storm ui &
4.Zookepperの起動(インストールは割愛)
# cd /usr/local/lib/zookeeper
# ./bin/zkServer.sh start &
5.サンプルの実行
# cd /usr/local/lib/storm/examples/storm-starter/
# storm jar target/storm-starter-1.1.0.jar org.apache.storm.starter.ExclamationTopology local
Running: /usr/java/jdk1.8.0_131/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/usr/local/lib/storm -Dstorm.log.dir=/usr/local/lib/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/lib/storm/lib/storm-core-1.1.0.jar:/usr/local/lib/storm/lib/kryo-3.0.3.jar:/usr/local/lib/storm/lib/reflectasm-1.10.1.jar:/usr/local/lib/storm/lib/asm-5.0.3.jar:/usr/local/lib/storm/lib/minlog-1.3.0.jar:/usr/local/lib/storm/lib/objenesis-2.1.jar:/usr/local/lib/storm/lib/clojure-1.7.0.jar:/usr/local/lib/storm/lib/ring-cors-0.1.5.jar:/usr/local/lib/storm/lib/disruptor-3.3.2.jar:/usr/local/lib/storm/lib/log4j-api-2.8.jar:/usr/local/lib/storm/lib/log4j-core-2.8.jar:/usr/local/lib/storm/lib/log4j-slf4j-impl-2.8.jar:/usr/local/lib/storm/lib/slf4j-api-1.7.21.jar:/usr/local/lib/storm/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/lib/storm/lib/servlet-api-2.5.jar:/usr/local/lib/storm/lib/storm-rename-hack-1.1.0.jar:target/storm-starter-1.1.0.jar:/usr/local/lib/storm/conf:/usr/local/lib/storm/bin -Dstorm.jar=target/storm-starter-1.1.0.jar -Dstorm.dependency.jars= -Dstorm.dependency.artifacts={} org.apache.storm.starter.ExclamationTopology local
1890 [main] INFO o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -8548623279120140126:-5008538972653927482
2155 [main] WARN o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds
2290 [main] INFO o.a.s.u.NimbusClient - Found leader nimbus : localhost:6627
2335 [main] INFO o.a.s.s.a.AuthUtils - Got AutoCreds
2336 [main] WARN o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds
2346 [main] INFO o.a.s.u.NimbusClient - Found leader nimbus : localhost:6627
2449 [main] INFO o.a.s.StormSubmitter - Uploading dependencies - jars...
2450 [main] INFO o.a.s.StormSubmitter - Uploading dependencies - artifacts...
2450 [main] INFO o.a.s.StormSubmitter - Dependency Blob keys - jars : / artifacts : []
2476 [main] INFO o.a.s.StormSubmitter - Uploading topology jar target/storm-starter-1.1.0.jar to assigned location: /usr/local/lib/storm/data/nimbus/inbox/stormjar-e3b27253-e04f-4a0b-958b-dbccaf869716.jar
Start uploading file 'target/storm-starter-1.1.0.jar' to '/usr/local/lib/storm/data/nimbus/inbox/stormjar-e3b27253-e04f-4a0b-958b-dbccaf869716.jar' (64300713 bytes)
[==================================================] 64300713 / 64300713
6.結果確認
# ls -l /usr/local/lib/storm/logs/workers-artifacts/
…
drwxr-xr-x 5 root root 39 9月 1 20:17 local-1-1504264614
…
# less /usr/local/lib/storm/logs/workers-artifacts/local-1-1504264614/6701/worker.log
…
2017-09-01 20:18:03.475 o.a.s.d.task Thread-15-word-executor[14 14] [INFO] Emitting: word default [jackson]
2017-09-01 20:18:03.478 o.a.s.d.executor Thread-15-word-executor[14 14] [INFO] TRANSFERING tuple [dest: 5 tuple: source: word:14, stream: default, id: {}, [jackson]]
2017-09-01 20:18:03.482 o.a.s.d.executor Thread-13-exclaim1-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: word:14, stream: default, id: {}, [jackson]
2017-09-01 20:18:03.483 o.a.s.d.task Thread-13-exclaim1-executor[5 5] [INFO] Emitting: exclaim1 default [jackson!!!]
2017-09-01 20:18:03.484 o.a.s.d.executor Thread-13-exclaim1-executor[5 5] [INFO] TRANSFERING tuple [dest: 8 tuple: source: exclaim1:5, stream: default, id: {}, [jackson!!!]]
…
とまあこんな感じ。
Apache Flink インストールメモ
Apache Flinkを触りたくなったのでインストール
1. パッケージインストール
# wget http://ftp.meisei-u.ac.jp/mirror/apache/dist/flink/flink-0.8.1/flink-0.8.1-bin-hadoop1.tgz # tar xvzf flink-0.8.1-bin-hadoop1.tgz # mv flink-1.3.1 /usr/local/lib/flink
2. 起動&動作確認
# cd /usr/local/lib/flink # ./bin/start-local.sh Starting jobmanager daemon on host localhost.localdomain. ワードカウント用ファイルダウンロード # wget -O hamlet.txt http://www.gutenberg.org/cache/epub/1787/pg1787.txt 実行 # ./bin/flink run examples/batch/WordCount.jar -input hamlet.txt -output wordcount-result.txt Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123 Using address localhost:6123 to connect to JobManager. JobManager web interface address http://localhost:8081 … Job with JobID d5f7a38994f48e95a40c24c3a940e202 has finished. Job Runtime: 11847 ms 結果確認 # cat wordcount-result.txt | sort -rnk2 the 1163 and 1039 to 806 of 763 i 631 you 590 a 587 my 520 in 460 it 432 that 427 is 412 ham 358 this 338 not 334 for 310 d 306 his 304 with 295 but 278 your 261
ハムレットで一番でてくる単語はtheでした。
Twitterからデータを取得する(1)
1.pip/oauthをインストール
# curl -kL https://bootstrap.pypa.io/get-pip.py | python # pip install requests requests_oauthlib
2. コードを書く
# mkdir twitter # cd twitter
設定ファイルを作成。
# vim settings.py CONSUMER_KEY = "***" CONSUMER_SECRET = "***" ACCESS_TOKEN = "***" ACCESS_TOKEN_SECRET = "***" ★上記のトークンやキーの作成は、以下のサイトを参考にした。 [https://syncer.jp/twitter-api-matome:title]
Tweetデータを取得するコード。
# vim timeline.py # -*- coding: utf-8 -*- import tweepy import settings auth = tweepy.OAuthHandler(settings.CONSUMER_KEY, settings.CONSUMER_SECRET) auth.set_access_token(settings.ACCESS_TOKEN, settings.ACCESS_TOKEN_SECRET) api = tweepy.API(auth) public_tweets = api.home_timeline() for tweet in public_tweets: print tweet.text.encode('utf-8')
これで20件のデータを取得できる。
3.Streaming APIを試してみる。
以下のサンプルコードを作成した。
# -*- coding: utf-8 -*- import tweepy import time import settings class StreamListener(tweepy.StreamListener): def __init__(self): super(StreamListener,self).__init__() def on_status(self, status): print '----------' print status.created_at print status.text.encode('utf-8') return True def on_error(self, status_code): print str(status_code) return False if __name__ == "__main__": auth = tweepy.OAuthHandler(settings.CONSUMER_KEY, settings.CONSUMER_SECRET) auth.set_access_token(settings.ACCESS_TOKEN, settings.ACCESS_TOKEN_SECRET) api = tweepy.API(auth) stream = tweepy.Stream(auth=api.auth, listener=StreamListener()) while True: try: stream.filter(languages=['ja'], track=[u'tokyo']) except: time.sleep(60) stream = tweepy.Stream(auth=api.auth, listener=StreamListener())
これでコンソールにだらだらとTweetが表示される。面白い!
DockerでHadoopを動かす(1)
DockerでHadoopを動かしてみる。
基本は以下のサイトどおり。
https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-common/SingleCluster.html
1.Hadoopインストール
wget http://apache.claz.org/hadoop/common/hadoop-2.7.2/hadoop-2.7.2.tar.gz tar xvzf hadoop-2.7.2.tar.gz mv hadoop-2.7.2 /usr/local/lib/hadoop
2.サンプル実行
/usr/local/lib/hadoop/bin/hadoop jar /usr/local/lib/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar pi 10 2000
3.HDFSにファイルを入れてみる。
# cd /usr/local/lib/hadoop # bin/hdfs namenode -format # sbin/start-dfs.sh
sshが入っていないのでopenssh-clientsとopenssh-serverをyumで入れる。
yum install -y openssh-clients yum install -y openssh-server
dockerコンテナはいったんhadoopという名前をつけてcommitしておく。
commitしたイメージを再度コンテナとして起動するが、
systemctでsshdを起動しようとすると権限の問題で怒られるので、
以下の起動の仕方で起動する。
# docker run --privileged -d hadoop /sbin/init # docker exec -ti container_id bash
しかしここでJAVA_HOMEが設定されてないと怒られる。
/root/.bash_profileや/etc/profileにexportを記述しても解決しないので、
以下のファイルのexport JAVA_HOME=…を上書きした。
# vim etc/hadoop/hadoop-env.sh
# bin/hdfs dfs -mkdir /user # bin/hdfs dfs -put etc/hadoop/ /user # bin/hdfs dfs -ls /user drwxr-xr-x - root root 4096 2016-07-27 03:22 /user/hadoop # bin/hdfs dfs -ls /user/hadoop rw-r--r-- 1 root root 4436 2016-07-27 03:22 /user/hadoop/capacity-scheduler.xml -rw-r--r-- 1 root root 1335 2016-07-27 03:22 /user/hadoop/configuration.xsl -rw-r--r-- 1 root root 318 2016-07-27 03:22 /user/hadoop/container-executor.cfg -rw-r--r-- 1 root root 774 2016-07-27 03:22 /user/hadoop/core-site.xml -rw-r--r-- 1 root root 3670 2016-07-27 03:22 /user/hadoop/hadoop-env.cmd -rw-r--r-- 1 root root 4266 2016-07-27 03:22 /user/hadoop/hadoop-env.sh -rw-r--r-- 1 root root 2490 2016-07-27 03:22 /user/hadoop/hadoop-metrics.properties -rw-r--r-- 1 root root 2598 2016-07-27 03:22 /user/hadoop/hadoop-metrics2.properties -rw-r--r-- 1 root root 9683 2016-07-27 03:22 /user/hadoop/hadoop-policy.xml …(省略)
ということで、HDFSまで動かすことができました。
Docker上でKafkaを動かす(2)
今回は前回構築した1コンテナでのKafkaを複数コンテナにしてみる。
複数コンテナを構成するには、docker-composeを利用すると便利らしい。
ということでdocker-composeを使ってみる。
1.Docker-composeのインストール
# curl -L https://github.com/docker/compose/releases/download/1.7.1/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose # chmod +x /usr/local/bin/docker-compose
2.Docker-composeのyamlファイルを作成する。
今回は4つコンテナを起動し、各々にBrokerが起動するようにする。
以下の様なyamlを作成した。
version: '2' services: kafka1: image: kafka container_name: kafka1 hostname: kafka1 networks: kafka_default: ipv4_address: 172.16.238.10 ports: - "2181" - "9092" extra_hosts: - "kafka2:172.16.238.11" - "kafka3:172.16.238.12" - "kafka4:172.16.238.13" stdin_open: true tty: true kafka2: image: kafka container_name: kafka2 hostname: kafka2 networks: kafka_default: ipv4_address: 172.16.238.11 ports: - "2181" - "9092" extra_hosts: - "kafka1:172.16.238.10" - "kafka3:172.16.238.12" - "kafka4:172.16.238.13" stdin_open: true tty: true kafka3: image: kafka container_name: kafka3 hostname: kafka3 networks: kafka_default: ipv4_address: 172.16.238.12 ports: - "2181" - "9092" extra_hosts: - "kafka1:172.16.238.10" - "kafka2:172.16.238.11" - "kafka4:172.16.238.13" stdin_open: true tty: true kafka4: image: kafka container_name: kafka4 hostname: kafka4 networks: kafka_default: ipv4_address: 172.16.238.13 ports: - "2181" - "9092" extra_hosts: - "kafka1:172.16.238.10" - "kafka2:172.16.238.11" - "kafka3:172.16.238.12" stdin_open: true tty: true networks: kafka_default: driver: bridge driver_opts: networks: kafka_default: driver: bridge driver_opts: com.docker.network.enable_ipv6: "true" ipam: driver: default config: - subnet: 172.16.238.0/24 gateway: 172.16.238.1 - subnet: 2001:3984:3989::/64 gateway: 2001:3984:3989::1
3.コンテナを起動&設定する
# docker-compose up
以下は各コンテナで設定する。
★zookeeper.properties
以下を追加(全コンテナ共通)
server.1=kafka1:2888:3888 server.2=kafka2:2888:3888 server.3=kafka3:2888:3888 server.4=kafka4:2888:3888 initLimit=5 syncLimit=2
★myid
kakfa1: # echo 1 > /tmp/zookeeper/myid kakfa2: # echo 2 > /tmp/zookeeper/myid kakfa3: # echo 3 > /tmp/zookeeper/myid kakfa4: # echo 4 > /tmp/zookeeper/myid
★server.properties
kakfa1: broker.id=1 zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181,kafka4:2181 host.name=kafka1 kakfa2: broker.id=2 zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181,kafka4:2181 host.name=kafka2 kakfa3: broker.id=3 zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181,kafka4:2181 host.name=kafka3 kakfa3: broker.id=3 zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181,kafka4:2181 host.name=kafka3
Zookeeperとkafkaを起動する。面倒なので一括で起動する。
# docker ps | sed '1d' | awk '{print$1}' | xargs -I{} docker exec -d {} /usr/local/lib/kafka/bin/zookeeper-server-start.sh /usr/local/lib/kafka/config/zookeeper.properties # docker ps | sed '1d' | awk '{print$1}' | xargs -I{} docker exec -d {} /usr/local/lib/kafka/bin/kafka-server-start.sh /usr/local/lib/kafka/config/server.properties
トピックを作成する。
# kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytopic
トピックが作成されたか確認する。
# docker ps | sed '1d' | awk '{print$1}' | xargs -I{} docker exec {} ls /tmp/kafka-logs kafka1: cleaner-offset-checkpoint meta.properties recovery-point-offset-checkpoint replication-offset-checkpoint kafka2: cleaner-offset-checkpoint meta.properties recovery-point-offset-checkpoint replication-offset-checkpoint kafka3: cleaner-offset-checkpoint meta.properties mytopic-0★(作成されたトピック) recovery-point-offset-checkpoint replication-offset-checkpoint kafka4: cleaner-offset-checkpoint meta.properties recovery-point-offset-checkpoint replication-offset-checkpoint (上記の表示は編集上加工してます)
今度はパーティションを4つにしてトピックを作成してみる。
# /usr/local/lib/kafka/bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 4 --topic mytopic_part4 # docker ps | sed '1d' | awk '{print$1}' | xargs -I{} docker exec {} ls /tmp/kafka-logs kafka1: cleaner-offset-checkpoint meta.properties mytopic_part4-3★(作成されたトピック) recovery-point-offset-checkpoint replication-offset-checkpoint kafka2 cleaner-offset-checkpoint meta.properties mytopic_part4-0★(作成されたトピック) recovery-point-offset-checkpoint replication-offset-checkpoint kafka3: cleaner-offset-checkpoint meta.properties mytopic-0 mytopic_part4-1★(作成されたトピック) recovery-point-offset-checkpoint replication-offset-checkpoint kafka4: cleaner-offset-checkpoint meta.properties mytopic_part4-2★(作成されたトピック) recovery-point-offset-checkpoint replication-offset-checkpoint
今度はパーティションを4つにして、かつレプリカ3でトピックを作成してみる。
# /usr/local/lib/kafka/bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 3 --partitions 4 --topic mytopic_part4_rep3 kafka1: cleaner-offset-checkpoint meta.properties mytopic_part4-3 mytopic_part4_rep3-0★(作成されたトピック) mytopic_part4_rep3-1★(作成されたトピック) mytopic_part4_rep3-2★(作成されたトピック) recovery-point-offset-checkpoint replication-offset-checkpoint kafka2: cleaner-offset-checkpoint meta.properties mytopic_part4-0 mytopic_part4_rep3-1★(作成されたトピック) mytopic_part4_rep3-2★(作成されたトピック) mytopic_part4_rep3-3★(作成されたトピック) recovery-point-offset-checkpoint replication-offset-checkpoint kafka3: cleaner-offset-checkpoint meta.properties mytopic-0 mytopic_part4-1 mytopic_part4_rep3-0★(作成されたトピック) mytopic_part4_rep3-2★(作成されたトピック) mytopic_part4_rep3-3★(作成されたトピック) recovery-point-offset-checkpoint replication-offset-checkpoint kafka4: cleaner-offset-checkpoint meta.properties mytopic_part4-2 mytopic_part4_rep3-0★(作成されたトピック) mytopic_part4_rep3-1★(作成されたトピック) mytopic_part4_rep3-3★(作成されたトピック) recovery-point-offset-checkpoint replication-offset-checkpoint
トピックにメッセージを送る。
# /usr/local/lib/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic_part4_rep 12345 (Ctrl-C) # docker ps | sed '1d' | awk '{print$1}' | xargs -I{} docker exec {} ls -l /tmp/kafka-logs/mytopic_part4_rep3-0 total 4 -rw-r--r-- 1 root root 10485760 Jul 23 19:22 00000000000000000000.index -rw-r--r-- 1 root root 34 Jul 23 19:45 00000000000000000000.log ls: cannot access /tmp/kafka-logs/mytopic_part4_rep3-0: No such file or directory total 4 -rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index -rw-r--r-- 1 root root 34 Jul 23 19:45 00000000000000000000.log total 4 -rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index -rw-r--r-- 1 root root 34 Jul 23 19:45 00000000000000000000.log # docker ps | sed '1d' | awk '{print$1}' | xargs -I{} docker exec {} ls -l /tmp/kafka-logs/mytopic_part4_rep3-1 total 4 -rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index -rw-r--r-- 1 root root 34 Jul 23 19:45 00000000000000000000.log total 4 -rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index -rw-r--r-- 1 root root 34 Jul 23 19:45 00000000000000000000.log ls: cannot access /tmp/kafka-logs/mytopic_part4_rep3-1: No such file or directory total 4 -rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index -rw-r--r-- 1 root root 34 Jul 23 19:45 00000000000000000000.log # docker ps | sed '1d' | awk '{print$1}' | xargs -I{} docker exec {} ls -l /tmp/kafka-logs/mytopic_part4_rep3-2 total 0 -rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index -rw-r--r-- 1 root root 0 Jul 23 19:23 00000000000000000000.log total 0 -rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index -rw-r--r-- 1 root root 0 Jul 23 19:23 00000000000000000000.log total 0 -rw-r--r-- 1 root root 10485760 Jul 23 19:22 00000000000000000000.index -rw-r--r-- 1 root root 0 Jul 23 19:22 00000000000000000000.log ls: cannot access /tmp/kafka-logs/mytopic_part4_rep3-2: No such file or directory # docker ps | sed '1d' | awk '{print$1}' | xargs -I{} docker exec {} ls -l /tmp/kafka-logs/mytopic_part4_rep3-3 ls: cannot access /tmp/kafka-logs/mytopic_part4_rep3-3: No such file or directory total 4 -rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index -rw-r--r-- 1 root root 34 Jul 23 19:45 00000000000000000000.log total 4 -rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index -rw-r--r-- 1 root root 34 Jul 23 19:45 00000000000000000000.log total 4 -rw-r--r-- 1 root root 10485760 Jul 23 19:23 00000000000000000000.index -rw-r--r-- 1 root root 34 Jul 23 19:45 00000000000000000000.log
全体ではパーティション✕レプリカの数だけディレクトリが作成され、
メッセージは上記の数に分散して保存される。
今回のメッセージはmytopic_part4_rep3-0, mytopic_part4_rep3-1, mytopic_part4_rep3-3に格納されている。
こんな感じでした。
Docker上でKafkaを動かす(1)
前回Sparkを入れてみたが、前段のデータのキューの部分を実現したくなった。
事情により今回からCentOS7の仮想マシン上に構築している。
1.まずはDockerを入れる。
# yum -y install docker-io
2.Docker上でCentos7のイメージを起動する。
コンテナが起動したら自動で設定されるDNSを設定する # vim /etc/sysconfig/docker other_args="-dns 8.8.8.8" コンテナをダウンロードして起動する。 # docker pull centos:centos7 # docker run -i -t centos:centos7 /bin/bash
3.DockerコンテナZookeeperとKafkaを入れる
以降はDockerコンテナ上での操作。
3-1.Dockerコンテナ上にzookeeperをインストール
# yum install -y wget
とここで以下のようなエラーがでた。
Error: database disk image is malformed
どうも yum cleanをする必要があるらしい。
# yum clean all
仕切りなおしてZookeeperを入れる
# wget http://mirror.reverse.net/pub/apache/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz # tar xvzf zookeeper-3.4.8.tar.gz # mv zookeeper-3.4.8 /usr/local/lib/zookeeper
3-2.Dockerコンテナ上にKafkaをインストール
Scalaを入れる
#rpm -ivh http://downloads.lightbend.com/scala/2.11.8/scala-2.11.8.rpm
続いてKafkaをインストール
# wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz # tar xvzf kafka_2.11-0.10.0.0.tgz # mv kafka_2.11-0.10.0.0 /usr/local/lib/kafka
ここでいったんホスト側からコンテナをイメージ化しておく
# docker commit container_id kafka
4.起動
まずはコンテナを起動する。
# docker run -i -t kafka /bin/bash
Zookeeperを起動。
# cd /usr/local/lib # kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties &
Kafkaを起動
# kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties &
起動しているか確認する。
# ps aux | grep zookeeper # ps aux | grep kafka
トピックを作成する。トピック名はmytopicにする。トピックを確認する。
# kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytopic # kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181 mytopic
トピックに対してメッセージ送信
# kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic Test Message. (Ctrl-C)
トピックからメッセージを受信
# kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning Test Message. (Ctrl-C)
メッセージ自体の格納場所を確認。
# cat /tmp/kafka-logs/mytopic-0/00000000000000000000.log
Test Message.
とりあえずこんな感じ。
次は複数のBrokerでどういった振る舞いになるか確認して見る。
Sparkいれてみた
CentOS 6.8に入れてみる。
1.インストール
・scala # rpm -ivh http://downloads.lightbend.com/scala/2.10.6/scala-2.10.6.rpm ・spark # wget http://ftp.riken.jp/net/apache/spark/spark-1.6.2/spark-1.6.2-bin-hadoop2.6.tgz # tar xvzf spark-1.6.2-bin-hadoop2.6.tgz # mv spark-1.6.2-bin-hadoop2.6 /usr/local/lib/spark
2.動作確認
# cd /usr/local/lib/spark # ./bin/spark-shell log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties To adjust logging level use sc.setLogLevel("INFO") Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.6.2 /_/ Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.7.0_99) Type in expressions to have them evaluated. Type :help for more information.
これで動いたらしい。
いくつかサンプルがあるので動かしてみる。
3.サンプルの実行
# cd examples/src/main/python # ls als.py cassandra_outputformat.py kmeans.py mllib pi.py status_api_demo.py wordcount.py avro_inputformat.py hbase_inputformat.py logistic_regression.py pagerank.py sort.py streaming cassandra_inputformat.py hbase_outputformat.py ml parquet_inputformat.py sql.py transitive_closure.py
とりあえずword_conunt.pyを選択。
文字数を数えてくれるものらしい。
# ../../../../bin/spark-submit wordcount.py ../../../../README.md help: 1 when: 1 Hadoop: 3 "local": 1 including: 3 …
とりあえずこんなもん。
次はkafkaと動かしてみたいな。