Spark-安装

0x01 zookeeper集群搭建

安装 Java

从官网下载最新版 Java 就可以,Spark官方说明 Java 只要是6以上的版本都可以,我下的是 jdk-8-linux-x64.gz; 在~/workspace目录下直接解压

1
2
3
4
5
6
7
8
9
tar -zxvf jdk-7u75-linux-x64.gz
修改环境变量sudo vi /etc/profile,添加下列内容,注意将home路径替换成你的:

export WORK_SPACE=/home/spark/workspace/
export JAVA_HOME=$WORK_SPACE/jdk1.8
export JRE_HOME=/home/spark/work/jdk1.8/jre
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
export CLASSPATH=$CLASSPATH:.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
然后使环境变量生效,并验证 Java 是否安装成功

$ source /etc/profile #生效环境变量
$ java -version #如果打印出如下版本信息,则说明安装成功

1
2
3
java version "1.8.0_75"
Java(TM) SE Runtime Environment (build 1.8.0_75-b13)
Java HotSpot(TM) 64-Bit Server VM (build 24.75-b04, mixed mode)

安装zookeeper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
sudo wget https://archive.apache.org/dist/kafka/0.10.1.0/kafka_2.11-0.10.1.0.tgz
sudo tar -zxf kafka_2.11-0.10.1.0.tgz -C /usr/local
sudo mv /usr/local/kafka_2.11-0.10.1.0 /usr/local/kafka
cd /usr/local/kafka/config/

sudo vi zookeeper.properties
dataDir=/usr/local/kafka/zookeeper
dataLogDir=/usr/local/kafka/log/zookeeper
clientPort=2181
maxClientCnxns=100
tickTime=2000
initLimit=10
synvLimit=5

server.1=192.168.1.1:2888:3888
server.1=192.168.1.2:2888:3888


sudo mkdir /usr/local/kafka/zookeeper
sudo mkdir -p /usr/local/kafka/log/zookeeper
cd /usr/local/kafka/zookeeper/

# myid是zk集群用来发现彼此的标识,必须创建,且不能相同;
sudo vi myid
1 #每台机器不同


进入kafka目录 执行启动zookeeper命令:
cd /usr/local/kafka/bin/
sudo nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties &
三台机器都执行启动命令,查看zookeeper的日志文件,没有报错就说明zookeeper集群启动成功了。

0x02 kafka集群搭建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
sudo mkdir -p /usr/local/kafka/log/kafka
cd config
broker.id=1 #每台机器不同
port=9092
host.name=192.168.1.1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/log/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

zookeeper.connect=192.168.1.1:2181,192.168.1.2:2181
zookeeper.connection.timeout.ms=6000

cd /usr/local/kafka/
sudo ./bin/kafka-server-start.sh -daemon config/server.properties

更新kafka的offset异常问题:
bin/kafka-consumer-groups.sh –bootstrap-server localhost:9092 –group test-group –reset-offsets –all-topics –to-current –execute
bin/kafka-consumer-groups.sh –bootstrap-server localhost:9092 –group test-group –reset-offsets –all-topics –to-offset 500000 –execute –更新到指定位置
bin/kafka-consumer-groups.sh –bootstrap-server localhost:9092 –group test-group –reset-offsets –all-topics –to-earliest –execute –更新到最初位置

0x03 kafka管理界面

1
2
3
4
5
6
7
sudo unzip kafka-manager-1.3.3.7.zip -d /usr/local
sudo mv /usr/local/kafka-manager-1.3.3.7/ /usr/local/kafka-manager
cd /usr/local/kafka-manager
sudo vi conf/application.conf
kafka-manager.zkhosts="192.168.1.1:2181,192.168.1.2:2181" # 如果zk是集群,这里填写多个zk地址

sudo nohup ./bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8080 &

0x04 flume配置

下载flume的包,查看lib文件夹下是否存在kafka依赖库文件
cd /chj/soft/flume-1.8.0/
./bin/flume-ng agent –conf conf –conf-file conf/flume-conf.properties –name a1 -Dflume.root.logger=INFO,console

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/nginx/logs/access.log

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = weblog
a1.sinks.k1.brokerList = 192.168.1.1:9092,192.168.1.2:9092 #把日志写入到kafka
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

0x05 测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
创建:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsendertest

生产:
bin/kafka-console-producer.sh --broker-list 192.168.1.1:9092 --topic test-a

消费:
bin/kafka-console-consumer.sh --zookeeper 192.168.1.1:2181 --from-beginning --topic test-a


列出已创建的topic列表
./bin/kafka-topics.sh --list --zookeeper localhost:2181
查看消费者是否连接到group,并消费偏移量
sudo ./kafka-consumer-groups.sh --bootstrap-server 192.168.1.1:9092 --group ghh --describe

0x06 设置开机启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
yum install initscripts -y
mkdir /tmp/runlog
cd /etc/init.d/
sudo vi zookeeper
#!/bin/bash
export JAVA_HOME=/chj/pkg/jdk
export PATH=$JAVA_HOME/bin:$PATH
export LOG_DIR=/tmp/kafka/runlog

#chkconfig:2345 20 90
#description:zookeeper
#processname:zookeeper

case $1 in
start)
echo -n "Starting zookeeper:"
/usr/local/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties > $LOG_DIR/server.out 2> $LOG_DIR/server.err &
echo " done."
exit 0
;;
stop)
echo -n "Stopping Kafka: "
/usr/local/kafka/bin/zookeeper-server-stop.sh
echo " done."
exit 0
;;
status)
c_pid=`ps -ef | grep zookeeper.properties | grep -v grep | awk '{print $2}'`
if [ "$c_pid" = "" ] ; then
echo "Stopped"
exit 3
else
echo "Running $c_pid"
exit 0
fi
;;
restart)
stop
start
;;
*)
echo "Usage: zookeeper start|stop|status|restart"
;;
esac

sudo chmod 755 zookeeper
sudo service zookeeper status
sudo chkconfig --add zookeeper
sudo chkconfig --list
sudo chkconfig zookeeper on

其它同样操作。

0x07 Spark Install on Yarn

机器互相

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
配置hosts
在每台主机上修改host文件

vi /etc/hosts
10.1.1.107 master
10.1.1.108 slave1
10.1.1.109 slave2
配置之后ping一下用户名看是否生效

ping slave1
ping slave2
SSH 免密码登录
安装Openssh server

sudo apt-get install openssh-server
在所有机器上都生成私钥和公钥

ssh-keygen -t rsa #一路回车
需要让机器间都能相互访问,就把每个机子上的id_rsa.pub发给master节点,传输公钥可以用scp来传输。

scp ~/.ssh/id_rsa.pub spark@master:~/.ssh/id_rsa.pub.slave1
在master上,将所有公钥加到用于认证的公钥文件authorized_keys中

cat ~/.ssh/id_rsa.pub* >> ~/.ssh/authorized_keys
将公钥文件authorized_keys分发给每台slave

scp ~/.ssh/authorized_keys spark@slave1:~/.ssh/
在每台机子上验证SSH无密码通信

ssh master
ssh slave1
ssh slave2
如果登陆测试不成功,则可能需要修改文件authorized_keys的权限(权限的设置非常重要,因为不安全的设置安全设置,会让你不能使用RSA功能 )

chmod 600 ~/.ssh/authorized_keys

安装scala

下载spark对应的Scala,注意不要下错版本,我这里下了 2.11.8,官方下载地址; 使用版本: Spark 2.1.3 Sacala 2.11.8 kafka 0.10.1.0 JDK 1.8

1
2
3
4
5
6
7
8
9
10
11
12
13
sudo wget https://downloads.lightbend.com/scala/2.11.8/scala-2.11.8.tgz
sudo tar xf scala-2.11.8.tgz
sudo mv scala-2.11.8 /usr/local/scala

再次修改环境变量sudo vi /etc/profile,添加以下内容:
export SCALA_HOME=/usr/local/scala
export PATH=$PATH:$SCALA_HOME/bin

同样的方法使环境变量生效,并验证 scala 是否安装成功

$ source /etc/profile #生效环境变量
$ scala -version #如果打印出如下版本信息,则说明安装成功
Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL

安装配置Hodoop Yarn

下载地址。 需要配置有以下7个文件:hadoop-env.sh,yarn-env.sh,slaves,core-site.xml,hdfs-site.xml,maprd-site.xml,yarn-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
sudo wget https://archive.apache.org/dist/hadoop/core/current/hadoop-3.1.1.tar.gz
sudo tar zxvf hadoop-3.1.1.tar.gz
sudo mv hadoop-3.1.1 /usr/local/hadoop
cd /usr/local/hadoop/etc/hadoop

1.在hadoop-env.sh中配置JAVA_HOME
export JAVA_HOME=/usr/local/jdk

2.在yarn-env.sh中配置JAVA_HOME
export JAVA_HOME=/usr/local/jdk

3.在slaves中配置slave节点的ip或者host, 没有就先不配置
slave1
slave2

4.修改core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://master:9000/</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>file:/usr/local/hadoop/tmp</value>
</property>
</configuration>

5.修改hdfs-site.xml
<configuration>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>master:9001</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/home/spark/workspace/hadoop-2.6.0/dfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/home/spark/workspace/hadoop-2.6.0/dfs/data</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
</configuration>

6.修改mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>

7.修改yarn-site.xml
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>master:8032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>master:8030</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>master:8035</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>master:8033</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>master:8088</value>
</property>
</configuration>

将配置好的hadoop文件夹分发给所有slaves
sudo scp -r ~/usr/loacal/hadoop spark@slave1:~/usr/local/

启动 Hadoop, 在 master 上执行以下操作,就可以启动 hadoop 了。
cd /usr/loacal/hadoop #进入hadoop目录
sudo bin/hadoop namenode -format #格式化namenode
sudo sbin/start-dfs.sh #启动dfs
sudo sbin/start-yarn.sh #启动yarn

缺少用户定义造成的问题,在start-dfs.sh, 顶部空白处添加内容:
HDFS_DATANODE_USER=root
HADOOP_SECURE_DN_USER=hdfs
HDFS_NAMENODE_USER=root
HDFS_SECONDARYNAMENODE_USER=root

用户定义造成的问题,在start-yarn.sh, 顶部空白处添加内容:
YARN_RESOURCEMANAGER_USER=root
HADOOP_SECURE_DN_USER=yarn
YARN_NODEMANAGER_USER=root


验证 Hadoop 是否安装成功
可以通过jps命令查看各个节点启动的进程是否正常。在 master 上应该有以下几个进程:
$ sudo jps #run on master
3407 SecondaryNameNode
3218 NameNode
3552 ResourceManager
3910 Jps

在每个slave上应该有以下几个进程:

$ sudo jps #run on slaves
2072 NodeManager
2213 Jps
1962 DataNode
或者在浏览器中输入 http://master:8088 ,应该有 hadoop 的管理界面出来了,并能看到 slave1 和 slave2 节点。

安装spark

下载地址。 可以兼容hadoop2.7以后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
sudo wget http://mirror.bit.edu.cn/apache/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
sudo tar zxvf spark-2.4.0-bin-hadoop2.7.tgz
sudo mv spark-2.4.0-bin-hadoop2.7 /usr/local/spark
cd /usr/local/spark/conf/
sudo cp spark-env.sh.template spark-env.sh

export SCALA_HOME=/usr/local/scala
export JAVA_HOME=/usr/local/jdk
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
SPARK_MASTER_IP=master
SPARK_LOCAL_DIRS=/usr/local/spark
SPARK_DRIVER_MEMORY=1G

注:在设置Worker进程的CPU个数和内存大小,要注意机器的实际硬件条件,如果配置的超过当前Worker节点的硬件条件,Worker进程会启动失败。

vi slaves在slaves文件下填上slave主机名:
slave1
slave2
将配置好的spark文件夹分发给所有slaves吧

sudo scp -r ~/usr/local/spark spark@slave1:~/usr/local/

启动Spark
sudo sbin/start-all.sh
验证 Spark 是否安装成功
用jps检查,在 master 上应该有以下几个进程:

$ sudo jps
7949 Jps
7328 SecondaryNameNode
7805 Master
7137 NameNode
7475 ResourceManager
在 slave 上应该有以下几个进程:

$ sudo jps
3132 DataNode
3759 Worker
3858 Jps
3231 NodeManager
进入Spark的Web管理页面: http://master:8080


运行示例:
#本地模式两线程运行
./bin/run-example SparkPi 10 --master local[2]

#Spark Standalone 集群模式运行
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077 lib/spark-examples-1.3.0-hadoop2.4.0.jar 100

#Spark on YARN 集群上 yarn-cluster 模式运行
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
lib/spark-examples*.jar \
10

单机运行

安装好jdk以后; 下载地址; 其它版本

1
2
3
4
5
6
7
8
sudo wget http://mirror.bit.edu.cn/apache/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
sudo tar zxvf spark-2.4.0-bin-hadoop2.7.tgz
sudo mv spark-2.4.0-bin-hadoop2.7 /usr/local/spark
sudo vim /etc/profile
export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin
$ source /etc/profile #生效环境变量
$ spark-shell #如果打印出如下版本信息,则说明安装成功; 如果出现主机名报错,修改hosts信息

在sbin的目录下的spark-config.sh 文件下添加JAVA_HOME的索引; 启动

1
$ sudo /usr/local/spark/sbin/start-master.sh

添加任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
sudo /usr/local/spark/bin/spark-submit --master spark://spark-master:7077 --class com.chj.sec.skyeye /opt/down/Skyeye2.jar
如果报错,就需要复制依赖包到集群的master和slave机器的spark目录下的jars里面。

--master:用于设置主结点URL的参数。
local:用于执行本地机器的代码。Spark运行一个单一的线程,在一个多核机器上,通过local[n]来指定一个具体使用的内核数,n指使用的内核数目,local[*]来指定运行和Spark机器内核一样多的复杂线程。
spark://host:port:这是一个URL和一个Spark单机集群的端口。
mesos://host:port:这是一个URL和一个部署在Mesos的Spark集群的端口。
yarn:作为负载均衡器,用于从运行Yarn的头结点提交作业。
--deploy-mode:允许决定是否在本地(使用client)启动Spark驱动成簇的参数,或者在集群内(使用cluster选项)的其中一台工作机器上启动。默人是client。
--name:应用程序名称。注意,创建SparkSession时,如果是以编程方式指定应用程序名称,那么来自命令行的参数会被重写。
--py-files:.py、.egg或者.zip文件的逗号分隔列表,包括Python应用程序,这些文件将被交付给每一个执行器来使用。
--files:命令给出一个逗号分隔的文件列表,这些文件将被交付给每一个执行器来使用。
--conf:参数通过命令行动态地更改应用程序的配置。语法是:<Spark property>=<value for the property>。
--properties-file:配置文件。它应该有和conf/spark-defaults.conf文件相同的属性设置,也是可读的。
--driver-memory:指定应用程序在驱动程序上分配多少内存的参数。允许的值又一个语法限制,类似于1000M,2G。默认值是1024M。
--exectuor-memory:参数指定每个执行器为应用程序分配多少内存。默认值是1G。
--help:展示帮助信息和退出。
--verbose:在运行应用程序时打印附加调试信息。
--version:打印Spark版本。
--class: 主类名称,含包名
--name: Application名称
--jars: Driver依赖的第三方jar包
--driver-cores: Driver程序的使用CPU个数,仅限于Spark Alone模式
--supervise: 失败后是否重启Driver,仅限于Spark Alone模式
--total-executor-cores: executor使用的总核数,仅限于Spark Alone、Spark on Mesos模式
--executor-cores: 每个executor使用的内核数,默认为1,仅限于Spark on Yarn模式
--queue: 提交应用程序给哪个YARN的队列,默认是default队列,仅限于Spark on Yarn模式
--num-executors: 启动的executor数量,默认是2个,仅限于Spark on Yarn模式
--archives: 仅限于Spark on Yarn模式

0x08 spark开发

新建项目

下载Intelij,直接安装; 新建scala项目并配置spark依赖和对应的kafka版本进行开发。
新建项目

选择 Scala -> SBT, 然点击 Next。 (SBT 是一个互动式的编译工具,详细了解看到 官网 查看。)
然后给自己的项目取个名字,接着再根据自己Spark版本,选择合适的Java、Scala版本,最后点击 Finish 即可。
新建项目

引入依赖包

引入Spark的依赖, 进入项目后,点击左上角 File -> Project Structure。 点击 Libraries ,然后点 绿色的加号。这时出现了三个选项: Java, Maven, Scala SDk。
导入依赖

也可以从Modules -> Dependencies 导入本地依赖包。
导入依赖

如果你选择maven方式添加spark,可以参考这里,来获取 对于的maven链接。形如:org.apache.spark:spark-core_2.11:2.4.0; 代表:scala-2.11.* spark-2.4.0; 这个需要和线上的保持一致。
导入依赖
接着就可以编写你的项目了!

编写项目

简单任务

导出jar包

引入Spark的依赖, 进入项目后,点击左上角 File -> Project Structure。 点击 Artifacts,如下:
jar包
jar包
jar包

点击build,选择build Artifacts进行构建
jar包
jar包

其它

kafka连接方式
Direct方式连接:将会创建跟kafka分区一样多的RDD partiions,并行的读取kafka topic的partition数据。kafka和RDD partition将会有一对一的对应关系。
Receiver方式连接:Receiver-based Approach需要启用WAL才能保证消费不丢失数据,效率比较低。

offset设置:
Spark Streaming checkpoints, 将offsets存储在HBase中, 将offsets存储到 ZooKeeper中, Kafka 本身, 其他方式。

spark官方推荐使用ml, 因为ml功能更全面更灵活,未来会主要支持ml,mllib很有可能会被废弃(据说可能是在spark3.0中deprecated)
ml主要操作的是DataFrame,DataFrame是Dataset的子集,也就是Dataset[Row], 而DataSet是对RDD的封装,对SQL之类的操作做了很多优化
ml中的操作可以使用pipeline, 跟sklearn一样,可以把很多操作(算法/特征提取/特征转换)以管道的形式串起来,然后让数据在这个管道中流动。大家可以脑补一下Linux管道在做任务组合时有多么方便。
1.把所有的攻击语句里面的特殊字符和数字,函数转化为指定的字母, 作为观察的几个维度;
2.把整个攻击语句泛化后的每个单词转化为数字,生成一个数组维度;
3.查看这组数组是否在语句中出现过

坚持原创技术分享,您的支持将鼓励我继续创作!