数据专栏

智能大数据搬运工,你想要的我们都有

科技资讯:

科技学院:

科技百科:

科技书籍:

网站大全:

软件大全:

变量的取值类型 因变量:连续变量,二分类变量,等级变量、多分类变量,连续带有删失变量 解释变量:连续、分类、等级变量 模型选择方式:基本公式(X,Y是否正态分布)
广义线性模型 y不是正态分布 指数分布族
glm()的用法 Logistic模型函数形式:
对数线性模型 分类变量 层次变量
一般线性模型
y正态分布,x非正态分布 完全随机设计方差分析,只考虑一个随机因素 随机单位组设计模型 #建立全变量logistic回归模型 d5.1=read.table('clipboard',header = T) logit<-glm(y~x1+x2+x3,family = binomial,data=d5.1) summary(logit) #逐步筛选变量logistic回归模型 logit.step=step(logit) summary(logit.step) #对数Poisson回归模型 d5.2=read.table('clipboard',header = T) log=glm(y~x1+x2,family=poisson,data=d5.2) summary(log) #一般线性回归模型 d5.3=read.table('clipboard',header = T) #完全随机设计方差分析 anova(lm(Y~factor(A),data = d5.3)) #随机单位组设计模型 d5.4=read.table('clipboard',header = T) anova(lm(Y~factor(A)+factor(B),data = d5.4))
参考资料: https://next.xuetangx.com/course/JNU07011000851/151569
大数据
2020-03-26 22:53:00
Redis中当内存达到极限时,主要采用了 6种内存淘汰策略/方式 进行内存对象的释放操作。 V olatile-lru:从设置了过期时间的数据集中,选择最近最少使用的数据释放。 A llkeys-lru:从数据集中(包括设置过期时间以及未设置过期时间的数据集中),选择最近最少使用的数据释放。 V olatile-random:从设置了过期时间的数据集中,随机选择一个数据进行释放。 A llkeys-random:从数据集中(包括了设置过期时间以及未设置过期时间的数据集)随机选择一个数据进行入释放。 V olatile-ttl:从设置了过期时间的数据集中,选择马上就要过期的数据进行释放。 N oeviction:不删除任意数据(但Redis还会根据引用计数器进行释放),这时如果内存不够时,会直接返回错误。
参考文档: https://www.cnblogs.com/WJ5888/p/4371647.html
大数据
2020-03-26 16:23:00
Redis使用的内存回收算法是 引用计数算法 和 LRU算法 。
1.引用计数算法: 对于创建的每一个对象都有一个与之关联的计数器,这个计数器记录着该对象被使用的次数,垃圾收集器在进行垃圾回收时,对扫描到的每一个对象判断一下计数器是否等于0,若等于0,就会释放该对象占用的内存空间,同时将该对象引用的其他对象的计数器进行减一操作。
算法实现方式: 引用计数算法的垃圾收集一般有侵入式与非侵入式两种,侵入式的实现就是将引用计数器直接根植在对象内部,用C++的思想进行解释就是,在对象的构造或者拷贝构造中进行加一操作,在对象的析构中进行减一操作;非侵入式思想就是有一块单独的内存区域,用作引用计数器。
算法优点 : 使用引用计数器,内存回收可以穿插在程序的运行中,在程序运行中,当发现某一对象的引用计数器为0时,可以立即对该对象所占用的内存空间进行回收,这种方式可以避免FULL GC(完全垃圾收集)时带来的程序暂停,Redis中就是在引用计数器为0时,对内存进行了回收。
算法缺点: 采用引用计数器进行垃圾回收,最大的缺点就是不能解决循环引用的问题,例如一个父对象持有一个子对象的引用,子对象也持有父对象的引用,这种情况下,父子对象将一直存在于JVM的堆中,无法进行回收。
2.LRU算法 : LRU是Least Recently Used的缩写,即最近最少使用,是一种常用的页面置换算法,选择最近最久未使用的页面予以淘汰。该算法赋予每个页面一个访问字段,用来记录该页面自上次被访问以来所经历的时间 t,当必须淘汰一个页面时,选择现有页面中其 t 值最大的,即最近最少使用的页面给予淘汰。 LRU算法最为经典的实现,就是 HashMap+Double LinkedList , 时间复杂度为 O(1) ,但是 如果按照HashMap和双向链表实现,需要额外的存储存放next和prev指针,牺牲比较大的存储空间,显然是不划算的。 所以Redis中的LRU算法 ,就是随机取出若干个key,然后按照访问时间排序后,淘汰掉最不经常使用的那个。
参考文档: https://my.oschina.net/u/4480939/blog/write https://www.cnblogs.com/WJ5888/p/4371647.html https://www.cnblogs.com/WJ5888/p/4359783.html
大数据
2020-03-26 16:08:00
HDFS上传流程
客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在。 NameNode返回是否可以上传。 客户端请求第一个 block上传到哪几个datanode服务器上。 NameNode返回3个datanode节点,分别为dn1、dn2、dn3。 客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。 dn1、dn2、dn3逐级应答客户端。 客户端开始往dn1上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位,dn1收到一个packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答。 当一个block传输完成之后,客户端再次请求NameNode上传第二个block的服务器。(重复执行3-7步)。
HDFS读流程
客户端跟namenode通信,请求下载某个数据。 namenode查询元数据信息以及block位置信息。 将数据所在的datanode信息返回给客户端。 客户端根据数据所在的datanode,挑选一台距离自己最近的datanode,并向其发出下载文件的请求(若所需数据不在一台datanode上保存,则分别向多台datanode发出请求)。 datanode响应客户端请求,将数据返回给客户端。 从多个datanode获得的数据不断在客户端追加,形成完整的数据
大数据
2020-03-26 12:01:00
速度快 ,因为数据都存于内存中,类似于HashMap,HashMap的优势就是查找和操作的时间复杂度都是O(1)。 支持丰富的数据类型 ,支持string、list、set、zset、hashmap等数据类型。 支持事务,操作都是原子性 ,即对数据的更改要么全部执行,要么全部不执行;R edis 对事务是部分支持的,如果是在入队时报错,那么都不会执行;在非入队时报错,那么成功的就会成功执行。 丰富的特性 , 可用作数据库、缓存和消息中间件,可以按 key 设置过期时间,过期后将会自动删除。
大数据
2020-03-26 11:37:00
如果master异常,则会进行master-slave切换,将其中一个slave作为master,将之前的master作为slave。
哨兵作用
哨兵是Redis集群架构中非常重要的一个组件,主要功能如下: 集群监控:负责监控redis master和slave进程是否正常 消息通知:如果某个redis实例有故障,那么哨兵负责发送消息作为报警通知给管理员 故障转移:如果master节点挂掉了,会自动转移到slave节点上 配置中心:如果故障转移发生了,通知client客户端新的master地址
哨兵的核心知识 故障转移时,判断一个master节点是否宕机了,需要大部分的哨兵都同意才行,涉及到了分布式选举的问题 哨兵至少需要3个实例,来保证自己的健壮性 哨兵+redis主从的部署架构,是不会保证数据零丢失的,只能保证redis集群的高可用性
sdown和odown sdown和odown两种失败的状态 sdown是主观宕机,就一个哨兵如果自己觉得一个master宕机了,那么就是主观宕机 odown是客观宕机,如果quorum数量的哨兵都觉得一个master宕机了,那么就是客观宕机 sdown达成的条件:如果一个哨兵ping一个master,超过了is-master-down-after-milliseconds指定的毫秒数之后,就认为master宕机 odown达成条件:如果一个哨兵在指定的时间内,收到了quorum指定数量的其他哨兵也认为那个master是宕机了,那么就认为是odown了,客观认为master宕机了
quorum和majority quorum:确认odown的最少哨兵数量 majority:授权进行主从切换的最少哨兵数量 每一个哨兵要做主备切换,首先需要quorum数量的哨兵认为odown,然后选举出一个哨兵来做切换,这个哨兵还得得到majority哨兵的特权,才能进行切换。 如果quorummajority,那么必须quorum数量的哨兵都授权,比如5个哨兵,quorum是5,那么必须5个哨兵都同意授权才能执行。(谁多听谁的)
为什么哨兵至少3个节点?
哨兵集群必须部署两个以上节点。如果哨兵集群仅仅部署了2个哨兵实例,那么它的majority就是2(2的majority=2,3的majority=2,5的majority=3,4的majority=2),如果其中一个哨兵宕机了,就无法满足majority>=2这个条件,那么master发生故障时也就无法进行主从切换了。
工作原理
每个Sentienl以每秒钟一次的频率向他所知的Master,Slave以及其他的Sentinel实例发送一个ping命令 如果一个实例距离最后一次有效回复ping命令的时间超过了down-after-milliseconds选项所指的值,则这个实例会被Sentinel标记为主观宕机 如果一个master被标记为主观宕机,则正在监视这个master的所有sentinel要以每一秒一次的频率确认Master的确进入了主观宕机状态 当有足够数量的Sentinel(大于等于配置文件所指的值)在指定的时间范围内确认master的确进入了主观宕机状态,则master会被标记为客观状态 在一般情况下,每个Sentinel会以1次/10秒的频率向他一致的所有master,slave发送INFO命令 当master被Sentinel标记为客观宕机是,Sentinel向下线的master的所有slave发送INFO命令的频率会从1次/10秒改为1次/秒 若没有足够数量的Sentinel同意master已经下线,master的客观宕机状态就会被移除;若master重新想Sentinel的ping命令返回有效回复,master的主观宕机状态就会被移除。
哨兵模式的配置
首先 配置redis的主从服务器 ,修改redis.conf文件如下 # 使得Redis服务器可以跨网络访问 bind 0.0.0.0 # 设置密码 requirepass "123456" # 指定主服务器,注意:有关slaveof的配置只是配置从服务器,主服务器不需要配置 slaveof 192.168.11.128 6379 # 主服务器密码,注意:有关slaveof的配置只是配置从服务器,主服务器不需要配置 masterauth 123456
上述内容主要是配置Redis服务器,从服务器比主服务器多了一个slaveof的配置和密码
配置3个哨兵 ,每个哨兵都是一样的。在Redis安装目录下有一个sentinel.conf文件,copy一份进行修改 # 禁止保护模式 protected-mode no # 配置监听的主服务器,这里sentinel monitor代表监控,mymaster代表服务器的名称,可以自定义,192.168.11.128代表监控的主服务器,6379代表端口,2代表只有两个或两个以上的哨兵认为主服务器不可用的时候,才会进行failover操作。 sentinel monitor mymaster 192.168.11.128 6379 2 # sentinel author-pass定义服务的密码,mymaster是服务名称,123456是Redis服务器密码 # sentinel auth-pass sentinel auth-pass mymaster 123456
启动服务器和哨兵 ,进入Redis安装目录的src目录 # 启动Redis服务器进程 ./redis-server ../redis.conf # 启动哨兵进程 ./redis-sentinel ../sentinel.conf
注意启动顺序: 首先是主机(192.168.11.128)的Redis服务进程,然后启动丛机的服务进程,最后启动3个哨兵的服务进程
Java中使用哨兵模式 /** * 测试Redis哨兵模式 * @author liu */ public class TestSentinels { @SuppressWarnings("resource") @Test public void testSentinel() { JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(10); jedisPoolConfig.setMaxIdle(5); jedisPoolConfig.setMinIdle(5); // 哨兵信息 Set sentinels = new HashSet<>(Arrays.asList("192.168.11.128:26379", "192.168.11.129:26379","192.168.11.130:26379")); // 创建连接池 JedisSentinelPool pool = new JedisSentinelPool("mymaster", sentinels,jedisPoolConfig,"123456"); // 获取客户端 Jedis jedis = pool.getResource(); // 执行两个命令 jedis.set("mykey", "myvalue"); String value = jedis.get("mykey"); System.out.println(value); } }
大数据
2020-03-26 11:24:00
Redis不仅支持简单的key-value数据类型,同时还提供string、list、set、zset、hash等数据结构的存储;而Memcached仅仅支持简单的key-value数据类型。 Redis支持数据的持久化,可以将内存中的数据保存到磁盘中,重启的时候再次加载使用;而Memcached将数据全部存于内存中。 Redis支持数据备份,即master-slave模式的数据备份。 Redis比Memcached的(读写)速度要快很多。 Redis使用的是单线程IO复用模型;而Memcached使用的是多线程非阻塞IO复用模型。
大数据
2020-03-26 11:24:00
Redis是一个高性能的、开源免费的、遵守BSD协议的key-value数据库。有以下三个特点:第一,Redis支持数据的持久化,可以将内存中数据保存在磁盘中,重启的时候可以再次加载进行使用;第二,Redis不仅仅支持简单的key-value类型的数据,同时还支持string、list、set、sorted set(zset)、hash数据类型;第三,Redis支持数据的备份,即master-slave模式的数据备份。
大数据
2020-03-26 10:38:00
session cluster和per job 因为是源码分析,所以会分为服务端和客户端两个部分的代码分析,下面我先看服务端

session cluster模式是类似standalone,先去向yarn申请好资源,然后供业务方提交,主要的入口类是YarnSessionClusterEntrypoint(这里指的是服务端的入口)


从上图可以看出来,startCluster()方法前后是两个分界线,startCluster之前是获取配置,之后是进行集群相关的创建,包括haService/blobServer/heartBeatService/resourceManger/webMonitorEndpoint。

这里有一点是需要说明的是有关executionGraphStore, 这里实际有两种,
1.将可执行图放在内存中,
2.将可执行图持久化到文件。

yarn session:将executionGraph持久化到文件

per job:将executionGraph持久化到文件

对于per job模式是每个任务对应一个集群,其实就是将上图中的YarnSessionClusterEntrypoint改成YarnJobClusterEntrypoint,其它流程基本一致(除去executionGrap的存储)。

下面来看一下两个主类的继承关系图



从图上可以看到主要的区别就是createSerializeableExecutionGraphStore方法,也就是executionGraph的存储位置不同。

session client和per job 由于flink不同的版本代码变动较大,所以在这里需要区分flink的版本进行一下说明 flink1.9之前的基本一致,提交至yarn的主要流程都在CliFrontend和FlinkYarnSessionCli中, 我们来看一下主要流程



这里session和per job的在流程上的最大区别就是clusterId是否为空 flink1.9之后进行了流程统一,抽象出了一个PipelineExecutor接口,统筹所有的提交,不过在看继承关系之前还是先看一下yarn-client的提交流程其实主要入口还是CLiFrontened,不过在加载完配置文件之后就直接反射调用invokeInteractiveModeForExecution,这个类会调用用户的main函数,加载完用户业务代码之后,会去走正常的提交流程。 到这里已经将所有的提交流程都说完了,大家对于flink争个提交流程应该有了更加清晰的认识。

最后在来说一下flink submit的接口,这是在flink-1.10才出现的一个新的统一,流程图如下



从上图可以看出来,AbstractSessionClusterExecutor中的主要调用逻辑其实和上面我们已经看到的session cluster的提交流程是一致的,只不过代码更加的抽象,这样其实扩展性也更加好,AbstractJobClusterExecutor主要主要就是为了向已有集群提交任务的,LocalExecutor其实是为了用户本地调试所用
欢迎关注我的公众号
大数据
2020-03-26 10:31:00
hdfs的集群中的三种角色:Namenode(NN),Datanode(DN),SecondNamenode(SN)
Namenode的工作机制
NN是hdfs的管理节点,主要职责包括:1.管理文件系统的命名空间,维护着hdfs中的虚拟文件目录树结构。2.保存文件系统中所有文件和目录元数据信息(机制复杂,内存中一份完整的,fsimage和edits log加起来是一份完整的)。3.相应客户端的请求,无论读写hdfs都会先访问NN,节点的交互细节都被封装在FileSystem类中了。4.NN中也会记载每个文件的各个块所在数据节点信息,但是NN不会持久化保存文件块与节点的映射信息,因为这些信息会在系统启动时候根据数据节点汇报上来的信息进行重建。
客户端向集群中存储数据之前,首先需要访问NN申请写入文件,NN检测对应的虚拟文件是否存在,如果不存在就向客户端返回可以存入并返回分配的DN,客户端对文件进行切分,将各个block存入NN返回的DN列表中写入。而一个blk的多个副本是由DN向其他的节点写入的,而不是由客户端来直接写入的,当DN在写某个副本时候失败了,会将失败信息返回给NN,NN会重新分配DN进行副本写入。文件的最后一个blk可能不满128M。但是这样的一个blk也要在NN中含有一条元信息记录(一个blk的元数据信息大约150B)。所以hdfs不善于存储小文件,因为小文件耗费NN的存储空间,MapReduce的性能也降低。
向集群中读写文件,都需要访问NN,所以NN的负载很大。那么如何提高NN的相应速度呢?每次对NN的访问,都会涉及到元数据的读取,为了尽可能快速读取元数据,可以将元数据全放内存,但是内存是遗失性存储,这样做安全性无法得到保证,但是元数据全放磁盘则查询又会太慢。
所以hdfs维护了两份元数据:内存中一份元数据(为了提高查询速度,所以也可以看到,NN需要将元数据全部都放在内存中,所以内存大小可能会限制整个hdfs的文件存储规模),磁盘中一份数据fsimage(为了元数据持久化),还有一个用于记录hdfs集群最新操作日志的小文件edits log。内存中的元数据和fsimage不总是完全一致的,内存中的元数据总是领先fsimage一个edits log的内容。因为最新的元数据总是先写入edits log,写入成功后让客户端去写文件,文件写入成功之后,NN将这份最新的元数据加载到内存中。当SN执行checkpoint操作时,会将fsimage和editslog的内容合并成新的新的fsimage,这个时候fsimage和内存中的元数据几乎相同(可能仍然还差着一个edits.new)。
edits log是一个小文件(默认是64M),并不能存太多的数据,以免影响写入速度。所以当edits log写满了的时候,则应该将edits log中的数据全部合并到fsimage中。但是edits log是日志格式,与fsimage的格式不一样,所以需要一定的资源用于合并,如果这个任务交给NN,则会加大NN的压力,所以引入了SN来解决fsimage和edits log合并的问题。当edits log写满时或者到了一个额定时间(默认3600s),NN通知SN来做checkpoint操作合并fsimage和edits log。SN进行check point操作:首先通知NN不要继续向原来edits中写入新数据了,NN把最新的元数据写入到edits.new中,然后SN从NN下载fsimage和edits,SN将fsimage导入内存,然后对其应用edits中的日志操作,最后生成一个fsimage.checkpoint文件并保存,然后将fsimage.checkpoint上传到NN。NN用将fsimage.checkpoint重命名为fsimage,edits.new重命名为edits,以替换掉旧的fsimage和edits。
以上的这一套机制,只能做到数据可靠而无法保证系统高可用。猜想:双NN,其中一个作为完全热备,但是需要保证双NN上的元数据的一致性。
Datanode的工作机制
DN提供真实数据的存储服务和数据库检索服务,DN会定期向NN发送他们所存储的块的列表。hdfs参数dfs.block.size能够控制块大小,默认是128M。即使一个文件没有达到block的大小,仍然会占用一条元数据。所以说hdfs最好存大文件,小文件比较费NN的元数据存储空间。hdfs存储文件block不会添加任何额外的内容,完全就是按照字节数来切分,到一个大小就切,到一个大小就开始切。
HDFS的java客户端编写
启动centos的图形界面命令:init 5或者startx
eclipse添加依赖包的过程:java build path->Libraries->Add Library->User Library->User libraries->New->add extermal JARs import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.junit.Before; import org.junit.Test; public class HdfsUtil { FileSystem fs = null; public void init() throws Exception{ //读取classpath下的xxx-site.xml 配置文件,并解析其内容,封装到conf对象中 Configuration conf = new Configuration(); //也可以在代码中对conf中的配置信息进行手动设置,会覆盖掉配置文件中的读取的值 conf.set("fs.defaultFS", "hdfs://weekend110:9000/"); //根据配置信息,去获取一个具体文件系统的客户端操作实例对象,记得hdfs的权限问题 fs = FileSystem.get(new URI("hdfs://weekend110:9000/"),conf,"hadoop"); } /** * 上传文件,比较底层的写法 * * @throws Exception */ public void upload() throws Exception { init(); Path dst = new Path("hdfs://weekend110:9000/aa/qingshu.txt"); FSDataOutputStream os = fs.create(dst); FileInputStream is = new FileInputStream("c:/qingshu.txt"); IOUtils.copy(is, os); } /** * 上传文件,封装好的写法 * @throws Exception * @throws IOException */ public void upload2() throws Exception, IOException{ init(); fs.copyFromLocalFile(new Path("c:/qingshu.txt"), new Path("hdfs://weekend110:9000/aaa/bbb/ccc/qingshu2.txt")); } /** * 下载文件,比较底层的写法 * @throws Exception */ public void download2() throws Exception { init(); FSDataInputStream is = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz")); FileOutputStream os = new FileOutputStream("c:/jdk7.tgz"); IOUtils.copy(is, os); } /** * 下载文件,封装好的写法 * @throws Exception * @throws IllegalArgumentException */ public void download() throws Exception { init(); fs.copyToLocalFile(new Path("hdfs://weekend110:9000/aa/qingshu2.txt"), new Path("c:/qingshu2.txt")); } /** * 查看文件信息 * @throws IOException * @throws IllegalArgumentException * @throws FileNotFoundException * */ public void listFiles() throws FileNotFoundException, IllegalArgumentException, IOException { init(); // listFiles列出的只是文件信息,不会列出文件夹的信息,可以对文件夹递归遍历 RemoteIterator files = fs.listFiles(new Path("/"), true); while(files.hasNext()){ LocatedFileStatus file = files.next(); Path filePath = file.getPath(); String fileName = filePath.getName(); System.out.println(fileName); } System.out.println("---------------------------------"); //listStatus 可以列出文件和文件夹的信息,但是不提供自带的递归遍历 FileStatus[] listStatus = fs.listStatus(new Path("/")); for(FileStatus status: listStatus){ String name = status.getPath().getName(); System.out.println(name + (status.isDirectory()?" is dir":" is file")); } } /** * 创建文件夹 * @throws Exception * @throws IllegalArgumentException */ public void mkdir() throws IllegalArgumentException, Exception { init(); fs.mkdirs(new Path("/aaa/bbb/ccc")); } /** * 删除文件或文件夹 * @throws IOException * @throws IllegalArgumentException */ public void rm() throws IllegalArgumentException, IOException { init(); fs.delete(new Path("/aa"), true); } public static void main(String[] args) throws Exception { download(); } }
fileSystem设计思想
将所有的文件系统抽象成了一个FileSystem之后,访问具体的对象时候,只管调用方法,不关心底层的实例对象到底是谁,这样MapReduce程序与底层的文件系统解耦合了。
hadoop框架中的RPC调用机制
RPC 远程过程调用,主要应用于分布式系统。比如有三台机器,a,b,c。a是一个客户端,访问了b,b中没有请求的服务,这个服务在远程的c机器上。b通过RPC机制能够向服务在本地一样地去调用c上的服务程序。具体如下:b将调用的服务类,方法,方法的参数等信息封装以下通过本地的socket client发送给c上的socket server,然后c机器根据调用信息,使用反射机制,获得一个服务类对象,调用相应的方法,然后再将方法的执行结果通过本地socket client发送给b上的一个socket server,然后b将这个结果解析出来。
NN和client,NN和DN,DN和DN之间很多通信,使用了大量的RPC通信机制,例如DN会定期的向NN报告本身的block情况。HADOOP实现了一个RPC框架——代理类机制(使用了动态代理和反射和socket技术)。1.生成调用端socket程序的动态代理对象。2.调用动态代理对象的业务方法。3.调用socket的请求方法。4发送调用请求。5.服务端开始了:生成业务类的动态代理对象。6.调用业务类动态代理对象的具体业务方法。7.获取调用结果。8.返回调用结果给服务端。9.给原始调用者返回结果。
Hadoop的RPC框架调用
主要的以依赖包都在share/common中。使用hadoop的RPC框架的最基本的部分:1.协议(表现为java接口),代理类和业务类都需要实现这个接口已确保方法的一致性。值得注意的是在协议接口中需要指定一个 public static final long versionID 的协议版本号,后面代理类调用方法时候,会用到这个参数,用来验证代理类和业务类的协议版本的一致性。2.业务类:一般类名以Impl结尾,在业务类中需要实现协议接口中的所有方法,这些被实现的方法都会以服务的形式发布出去。3.controller类,controller类接收客户端的请求,利用RPC.getProxy()方法生成一个代理类对象,然后用这个代理类对象调用一个协议接口中的方法,通过RPC机制远程调用业务类的方法,得到结果再返回给客户端。4.Starter类这个类名随意,主要的作用是将业务类中实现的协议接口的方法发布为一个服务:新建一个RPC.Builder类对象,然后设置服务的地址,端口,协议以及业务类,之后利用builder对象获得一个server对象,再启动起来就ok了。
1.协议接口 接口实现: public interface LoginServiceInterface { public static final long versionID=1L;//用于协定协议的版本号,RPC调用的时候会用到 public String login(String username,String password); }
2.服务端 业务类的实现: public class LoginServiceImpl implements LoginServiceInterface { @Override public String login(String username, String password) { return username + " logged in successfully!"; } } 启动类的实现: import java.io.IOException; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Builder; import org.apache.hadoop.ipc.RPC.Server; public class Starter { public static void main(String[] args) throws HadoopIllegalArgumentException, IOException { Builder builder = new RPC.Builder(new Configuration()); builder.setBindAddress("weekend110").setPort(10000).setProtocol(LoginServiceInterface.class).setInstance(new LoginServiceImpl()); Server server = builder.build(); server.start(); } }
客户端 import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; public class LoginController { public static void main(String[] args) throws Exception { LoginServiceInterface proxy = RPC.getProxy(LoginServiceInterface.class, 1L, new InetSocketAddress("weekend110", 10000), new Configuration()); String result = proxy.login("mijie", "123456"); System.out.println(result); } }
思考题:服务的动态转发和负载均衡???zookeeper
hdfs 源码分析 FileSystem对象的创建过程(创建与NN进行通信的RPC代理类) hdfs的一个核心类:FileSystem,fs必要的成员:1.rpcProxy代理类,这个代理类应该实现一个接口clientProtocal
源码重点:如何根据conf来获取对应的fs,获取fs时候,如何获得RPCProxy代理类对象。
首先获取conf中的URI,然后将URI解析成scheme(hdfs)和authority(weekend110:9000)
getInternal方法:单例模式,懒汉模式。先从一个Map中根据Key来获取,如果没有获取到,再去创建Filesysytem
createFileSystem方法:获取对应的fileSystem的class,在反射出来一个相应对象(这个对象是空的,里面的各个数据域都是基本值)
fs.initalize()方法:这是具体实现类的方法,设置DFS类的dfs,uri,workDir。设置dfs时候是构造了一个DFSClient对象。dfs的namenode就是rpc代理对象。
总结:根据conf对象,利用反射机制拿到DistributedFileSystem(是FileSystem的一个继承子类)对象,然后设置fs的各个数据域,其中有一个很重要的数据域 打开输入流的过程(创建与DN进行通信的RPC代理类)
先利用那个namenode代理类与NN通信,拿到文件的元信息(各个block的位置)
DFSInputStream的blockReader是与DN进行交互获得各个blk的详细过程。
locatedBlocks记录各个blk的详细信息。
大数据
2020-03-25 23:02:00
Series基本功能: axes 返回行轴标签列表。 dtype 返回对象的数据类型(dtype)。 empty 如果系列为空,则返回True。 ndim 返回底层数据的维数,默认定义:1。 size 返回基础数据中的元素数。 values 将系列作为ndarray返回。 head() 返回前n行。 tail() 返回最后n行。
DataFrame基本功能 T 转置行和列。 axes 返回一个列,行轴标签和列轴标签作为唯一的成员。 dtypes 返回此对象中的数据类型(dtypes)。 empty 如果NDFrame完全为空[无项目],则返回为True; 如果任何轴的长度为0。 ndim 轴/数组维度大小。 shape 返回表示DataFrame的维度的元组。 size NDFrame中的元素数。 values NDFrame的Numpy表示。 head()返回开头前n行。 tail()返回最后n行。
T(转置)
返回DataFrame的转置。行和列将交换。
实例: import pandas as pd import numpy as np # Create a Dictionary of series d = {'Name':pd.Series(['Tom','James','Ricky','Vin','Steve','Minsu','Jack']), 'Age':pd.Series([25,26,25,23,30,29,23]), 'Rating':pd.Series([4.23,3.24,3.98,2.56,3.20,4.6,3.8])} # Create a DataFrame df = pd.DataFrame(d) print ("The transpose of the data series is:") print df.T
执行上面示例代码,得到以下结果 - The transpose of the data series is: 0 1 2 3 4 5 6 Age 25 26 25 23 30 29 23 Name Tom James Ricky Vin Steve Minsu Jack Rating 4.23 3.24 3.98 2.56 3.2 4.6 3.8
axes
返回行轴标签和列轴标签列表
实例1: #Create a series with 100 random numbers s = pd.Series(np.random.randn(4)) print ("The axes are:") print s.axes
执行上面示例代码,得到以下输出结果 - The axes are: [RangeIndex(start=0, stop=4, step=1)]
实例2: #Create a Dictionary of series d = {'Name':pd.Series(['Tom','James','Ricky','Vin','Steve','Minsu','Jack']), 'Age':pd.Series([25,26,25,23,30,29,23]), 'Rating':pd.Series([4.23,3.24,3.98,2.56,3.20,4.6,3.8])} #Create a DataFrame df = pd.DataFrame(d) print ("Row axis labels and column axis labels are:") print df.axes
执行上面示例代码,得到以下结果 - Row axis labels and column axis labels are: [RangeIndex(start=0, stop=7, step=1), Index([u'Age', u'Name', u'Rating'], dtype='object')]
dtypes
返回每列的数据类型 #Create a Dictionary of series d = {'Name':pd.Series(['Tom','James','Ricky','Vin','Steve','Minsu','Jack']), 'Age':pd.Series([25,26,25,23,30,29,23]), 'Rating':pd.Series([4.23,3.24,3.98,2.56,3.20,4.6,3.8])} #Create a DataFrame df = pd.DataFrame(d) print ("The data types of each column are:") print df.dtypes
执行上面示例代码,得到以下结果 - The data types of each column are: Age int64 Name object Rating float64 dtype: object
empty
返回布尔值,表示对象是否为空; 返回True表示对象为空。 #Create a Dictionary of series d = {'Name':pd.Series(['Tom','James','Ricky','Vin','Steve','Minsu','Jack']), 'Age':pd.Series([25,26,25,23,30,29,23]), 'Rating':pd.Series([4.23,3.24,3.98,2.56,3.20,4.6,3.8])} #Create a DataFrame df = pd.DataFrame(d) print ("Is the object empty?") print df.empty
执行上面示例代码,得到以下结果 - Is the object empty? False
ndim
返回对象的维数。 #Create a Dictionary of series d = {'Name':pd.Series(['Tom','James','Ricky','Vin','Steve','Minsu','Jack']), 'Age':pd.Series([25,26,25,23,30,29,23]), 'Rating':pd.Series([4.23,3.24,3.98,2.56,3.20,4.6,3.8])} #Create a DataFrame df = pd.DataFrame(d) print ("Our object is:") print df print ("The dimension of the object is:") print df.ndim
执行上面示例代码,得到以下结果 - Our object is: Age Name Rating 0 25 Tom 4.23 1 26 James 3.24 2 25 Ricky 3.98 3 23 Vin 2.56 4 30 Steve 3.20 5 29 Minsu 4.60 6 23 Jack 3.80 The dimension of the object is: 2
shape
其中a表示行数,b表示列数。 #Create a Dictionary of series d = {'Name':pd.Series(['Tom','James','Ricky','Vin','Steve','Minsu','Jack']), 'Age':pd.Series([25,26,25,23,30,29,23]), 'Rating':pd.Series([4.23,3.24,3.98,2.56,3.20,4.6,3.8])} #Create a DataFrame df = pd.DataFrame(d) print ("Our object is:") print df print ("The shape of the object is:") print df.shape
执行上面示例代码,得到以下结果 - Our object is: Age Name Rating 0 25 Tom 4.23 1 26 James 3.24 2 25 Ricky 3.98 3 23 Vin 2.56 4 30 Steve 3.20 5 29 Minsu 4.60 6 23 Jack 3.80 The shape of the object is: (7, 3)
size
返回元素数。 #Create a Dictionary of series d = {'Name':pd.Series(['Tom','James','Ricky','Vin','Steve','Minsu','Jack']), 'Age':pd.Series([25,26,25,23,30,29,23]), 'Rating':pd.Series([4.23,3.24,3.98,2.56,3.20,4.6,3.8])} #Create a DataFrame df = pd.DataFrame(d) print ("Our object is:") print df print ("The total number of elements in our object is:") print df.size
执行上面示例代码,得到以下结果 - Our object is: Age Name Rating 0 25 Tom 4.23 1 26 James 3.24 2 25 Ricky 3.98 3 23 Vin 2.56 4 30 Steve 3.20 5 29 Minsu 4.60 6 23 Jack 3.80 The total number of elements in our object is: 21
values
以数组形式返回实际数据值。
实例1:
以数组形式返回系列中的实际数据值。 #Create a series with 4 random numbers s = pd.Series(np.random.randn(4)) print ("The actual data series is:") print s.values
执行上面示例代码,得到以下结果 - The actual data series is: [ 1.78737302 -0.60515881 0.18047664 -0.1409218 ]
实例2:
将DataFrame中的实际数据作为NDarray返回 #Create a Dictionary of series d = {'Name':pd.Series(['Tom','James','Ricky','Vin','Steve','Minsu','Jack']), 'Age':pd.Series([25,26,25,23,30,29,23]), 'Rating':pd.Series([4.23,3.24,3.98,2.56,3.20,4.6,3.8])} #Create a DataFrame df = pd.DataFrame(d) print ("The actual data in our data frame is:") print df.values
执行上面示例代码,得到以下结果 - The actual data in our data frame is: [[25 'Tom' 4.23] [26 'James' 3.24] [25 'Ricky' 3.98] [23 'Vin' 2.56] [30 'Steve' 3.2] [29 'Minsu' 4.6] [23 'Jack' 3.8]]
head()和tail()
要查看DataFrame对象的小样本,可使用head()和tail()方法。head()返回前n行(观察索引值)。显示元素的默认数量为5,但可以传递自定义数字值。tail()返回最后n行(观察索引值)。显示元素的默认数量为5.
实例: #Create a Dictionary of series d = {'Name':pd.Series(['Tom','James','Ricky','Vin','Steve','Minsu','Jack']), 'Age':pd.Series([25,26,25,23,30,29,23]), 'Rating':pd.Series([4.23,3.24,3.98,2.56,3.20,4.6,3.8])} #Create a DataFrame df = pd.DataFrame(d) print ("Our data frame is:") print (df) print ("The first two rows of the data frame is:") print (df.head(2)) print ("The last two rows of the data frame is:") print (df.tail(2))
输出: Our data frame is: Name Age Rating 0 Tom 25 4.23 1 James 26 3.24 2 Ricky 25 3.98 3 Vin 23 2.56 4 Steve 30 3.20 5 Minsu 29 4.60 6 Jack 23 3.80 The first two rows of the data frame is: Name Age Rating 0 Tom 25 4.23 1 James 26 3.24 The last two rows of the data frame is: Name Age Rating 5 Minsu 29 4.6 6 Jack 23 3.8
大数据
2020-03-28 13:48:00
Pandas有两种排序方式,它们分别是 - 按标签 按实际值
按标签排序
使用sort_index()方法,通过传递axis参数和排序顺序,可以对Series, DataFrame进行排序。 默认情况下,按照升序对行标签进行排序。
实例: df = pd.Series(['E','B','C']) print(df.sort_index(axis=0,ascending=False)) unsorted_df = pd.DataFrame(np.random.randn(10,2),index=[1,4,6,2,3,5,9,8,0,7],columns = ['col2','col1']) #按行排列 print (unsorted_df.sort_index(axis=0, ascending=False)) # 按列排列 print (unsorted_df.sort_index(axis=1, ascending=False))
输出: 2 C 1 B 0 E dtype: object col2 col1 9 -0.680375 0.450634 8 0.354761 -0.919791 7 0.539276 -0.416008 6 -0.067286 0.513746 5 -0.191821 -1.265648 4 -1.075135 0.717537 3 -0.436641 0.007743 2 1.002102 -1.133920 1 -0.193714 0.664201 0 -0.495355 -0.727960 col2 col1 1 -0.193714 0.664201 4 -1.075135 0.717537 6 -0.067286 0.513746 2 1.002102 -1.133920 3 -0.436641 0.007743 5 -0.191821 -1.265648 9 -0.680375 0.450634 8 0.354761 -0.919791 0 -0.495355 -0.727960 7 0.539276 -0.416008
按值排序
像索引排序一样,sort_values()是按值排序的方法。它接受一个by参数,它将使用要与其排序值的DataFrame的列名称。
实例: df = pd.Series(['E','B','C']) print(df.sort_values(axis=0,ascending=False)) unsorted_df = pd.DataFrame({'col1':[2,1,1,1],'col2':[1,3,2,4]}) print (unsorted_df.sort_values(by=['col1', 'col2'], ascending=False))
输出: 0 E 2 C 1 B dtype: object col1 col2 0 2 1 3 1 4 1 1 3 2 1 2
排序算法
sort_values()提供了从mergeesort,heapsort和quicksort中选择算法的一个配置。Mergesort是唯一稳定的算法
方法 时间 工作空间 稳定性 速度
'quicksort'(快速排序) 'mergesort'(归并排序) 'heapsort'(堆排序)
O(n^2) O(n*log(n)) O(n*log(n))
0 ~n/2 0
否 是 否
1 2 3
实例: import pandas as pd import numpy as np unsorted_df = pd.DataFrame({'col1':[2,1,1,1],'col2':[1,3,2,4]}) sorted_df = unsorted_df.sort_values(by='col1' ,kind='mergesort') print (sorted_df)
执行上面示例代码,得到以下结果 - col1 col2 1 1 3 2 1 2 3 1 4 0 2 1
大数据
2020-03-28 13:19:00
Pandas对象之间的基本迭代的行为取决于类型。当迭代一个系列时,它被视为数组式,基本迭代产生这些值。其他数据结构,如:DataFrame和Panel,遵循类似惯例迭代对象的键。
简而言之,基本迭代(对于i在对象中)产生: Series - 值 DataFrame - 列标签 Pannel - 项目标签
迭代Series
迭代Series 的方法和python字典对象类似
实例: df = pd.Series(['A','B','C']) #遍历series的值 for item in df: print(item) print('\n') #遍历series的键 for item in df.keys(): print(item) print('\n') #遍历series的键-值 for item, value in df.items(): print(item, value) print('\n') for item in df.items(): print(item) print('\n') for item in df.iteritems(): print(item)
输出: A B C 0 1 2 0 A 1 B 2 C (0, 'A') (1, 'B') (2, 'C') (0, 'A') (1, 'B') (2, 'C')
迭代DataFrame
迭代DataFrame提供列名
实例: import pandas as pd import numpy as np N=20 df = pd.DataFrame({ 'A': pd.date_range(start='2016-01-01',periods=N,freq='D'), 'x': np.linspace(0,stop=N-1,num=N), 'y': np.random.rand(N), 'C': np.random.choice(['Low','Medium','High'],N).tolist(), 'D': np.random.normal(100, 10, size=(N)).tolist() }) for col in df: print (col)
执行上面示例代码,得到以下结果 - A C D x y
要遍历数据帧(DataFrame)中的行,可以使用以下函数 - iteritems() - 迭代(key,value)对 iterrows() - 将行迭代为(索引,系列)对 itertuples() - 以namedtuples的形式迭代行
iteritems()
将每个列作为键,将值与值作为键和列值迭代为Series对象。
实例: import pandas as pd import numpy as np df = pd.DataFrame(np.random.randn(4,3),columns=['col1','col2','col3']) for key,value in df.iteritems(): print (key,value)
执行上面示例代码,得到以下结果 - col1 0 0.802390 1 0.324060 2 0.256811 3 0.839186 Name: col1, dtype: float64 col2 0 1.624313 1 -1.033582 2 1.796663 3 1.856277 Name: col2, dtype: float64 col3 0 -0.022142 1 -0.230820 2 1.160691 3 -0.830279 Name: col3, dtype: float64
iterrows()
iterrows()返回迭代器,产生每个索引值以及包含每行数据的序列。
实例: import pandas as pd import numpy as np df = pd.DataFrame(np.random.randn(4,3),columns = ['col1','col2','col3']) for row_index,row in df.iterrows(): print (row_index,row)
执行上面示例代码,得到以下结果 - 0 col1 1.529759 col2 0.762811 col3 -0.634691 Name: 0, dtype: float64 1 col1 -0.944087 col2 1.420919 col3 -0.507895 Name: 1, dtype: float64 2 col1 -0.077287 col2 -0.858556 col3 -0.663385 Name: 2, dtype: float64 3 col1 -1.638578 col2 0.059866 col3 0.493482 Name: 3, dtype: float64
itertuples()
itertuples()方法将为DataFrame中的每一行返回一个产生一个命名元组的迭代器。元组的第一个元素将是行的相应索引值,而剩余的值是行值。
示例: import pandas as pd import numpy as np df = pd.DataFrame(np.random.randn(4,3),columns = ['col1','col2','col3']) for row in df.itertuples(): print (row)
执行上面示例代码,得到以下结果 - Pandas(Index=0, col1=1.5297586201375899, col2=0.76281127433814944, col3=- 0.6346908238310438) Pandas(Index=1, col1=-0.94408735763808649, col2=1.4209186418359423, col3=- 0.50789517967096232) Pandas(Index=2, col1=-0.07728664756791935, col2=-0.85855574139699076, col3=- 0.6633852507207626) Pandas(Index=3, col1=0.65734942534106289, col2=-0.95057710432604969, col3=0.80344487462316527)
注意 - 不要尝试在迭代时修改任何对象。迭代是用于读取,迭代器返回原始对象(视图)的副本,因此更改将不会反映在原始对象上。
示例: import pandas as pd import numpy as np df = pd.DataFrame(np.random.randn(4,3),columns = ['col1','col2','col3']) for index, row in df.iterrows(): row['a'] = 10 print (df)
执行上面示例代码,得到以下结果 - col1 col2 col3 0 -1.739815 0.735595 -0.295589 1 0.635485 0.106803 1.527922 2 -0.939064 0.547095 0.038585 3 -1.016509 -0.116580 -0.523158
大数据
2020-03-28 09:53:00
1.只需要引入HBase的客户端依赖即可。 org.apache.hbase hbase-client 1.2.1
2.HBaseUtils.java 工具类抽取 public class HBaseUtils { private static Configuration configuration; // 1.获取配置对象 static { configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum","hadoop1:2181,hadoop2:2181,hadoop3:2181"); } // 2.获取连接对象 public static Admin getAdmin() throws IOException { Connection connection = ConnectionFactory.createConnection(configuration); Admin admin = connection.getAdmin(); return admin; } /*获取table*/ public static Table getTable() throws IOException { return getTable("user_info"); } public static Table getTable(String tablename) throws IOException { Connection connection = ConnectionFactory.createConnection(configuration); return connection.getTable(TableName.valueOf(tablename)); } /*关闭table*/ public static void close(Table table) throws IOException { if (table != null){ table.close(); } } // 3.释放admin public static void close(Admin admin) throws IOException { admin.close(); } public static void close(Admin admin,Table table) throws IOException { close(admin); close(table); } }
3.DDL 操作 public class TableDDL { private HBaseAdmin admin; @Before public void before() throws IOException { admin = (HBaseAdmin) HBaseUtils.getAdmin(); } /*创建表*/ @Test public void createTable() throws IOException { // 1.创建表描述器对象 HTableDescriptor ht = new HTableDescriptor(TableName.valueOf("user_info")); // 2.添加列簇 HColumnDescriptor familyColumn1 = new HColumnDescriptor("base_info"); HColumnDescriptor familyColumn2 = new HColumnDescriptor("extra_info"); ht.addFamily(familyColumn1); ht.addFamily(familyColumn2); admin.createTable(ht); } /*删除表*/ @Test public void deleteTable() throws IOException { TableName tableName = TableName.valueOf("user_info"); HTableDescriptor user_info = new HTableDescriptor(tableName); if (!admin.isTableDisabled(tableName)){ admin.disableTable(tableName); } admin.deleteTable(tableName); } /*修改表*/ @Test public void modifyTable() throws IOException { TableName tableName = TableName.valueOf("user_info"); HTableDescriptor user_info = admin.getTableDescriptor(tableName); // 2.添加列簇 HColumnDescriptor familyColumn1 = new HColumnDescriptor("base_info2"); HColumnDescriptor familyColumn2 = new HColumnDescriptor("extra_info2"); user_info.addFamily(familyColumn1); user_info.addFamily(familyColumn2); admin.modifyTable(tableName,user_info); } /*查询所有的列簇*/ @Test public void listAllFamily() throws IOException { //查询所有的列簇 HTableDescriptor user_info = admin.getTableDescriptor(TableName.valueOf("user_info")); HColumnDescriptor[] columnFamilies = user_info.getColumnFamilies(); for (HColumnDescriptor ht : columnFamilies){ System.out.println(ht.getNameAsString()); } } /*删除一个列簇*/ @Test public void removeFamily() throws IOException { TableName tableName = TableName.valueOf("user_info"); HTableDescriptor user_info = admin.getTableDescriptor(tableName); //删除 //user_info.removeFamily(Bytes.toBytes("extra_info")); //提交修改 //admin.modifyTable(tableName,user_info); admin.deleteColumn(tableName,Bytes.toBytes("extra_info")); } @After public void after() throws IOException { HBaseUtils.close(admin); } }
4.DML 操作 public class TableDML { private Table table; @Before public void before() throws IOException { table = HBaseUtils.getTable(); } /*插入一条记录 * put 'ns1:t1','r1','c1','value' * 批量插入的话,直接放入list集合即可 * */ @Test public void put() throws IOException { // 1.创建put对象 Put put = new Put(Bytes.toBytes("001")); //行建 // 2.添加列数据 put.addColumn(Bytes.toBytes("base_info"),Bytes.toBytes("name"),Bytes.toBytes("wang")); table.put(put); } /*查询一条记录*/ @Test public void get1() throws IOException { // 1.创建get对象 Get get = new Get(Bytes.toBytes("001")); Result result = table.get(get); NavigableMap base_info = result.getFamilyMap(Bytes.toBytes("base_info")); // 2.便利有序集合 Set> entries = base_info.entrySet(); for (Map.Entry entry : entries){ System.out.println(new String(entry.getKey() + "--->"+new String(entry.getValue()))); } } /*查询一条记录*/ @Test public void get2() throws IOException { // 1.创建get对象 Get get = new Get(Bytes.toBytes("001")); Result result = table.get(get); CellScanner cellScanner = result.cellScanner(); // 2.遍历 while (cellScanner.advance()){ //当前的cell Cell cell = cellScanner.current(); System.out.println(new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength()));//列簇 System.out.println(new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()));//列名 System.out.println(new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()));//列值 } } /*查询一条记录*/ @Test public void get3() throws IOException { // 1.创建get对象 Get get = new Get(Bytes.toBytes("001")); Result result = table.get(get); CellScanner cellScanner = result.cellScanner(); // 2.遍历 while (cellScanner.advance()){ //当前的cell Cell cell = cellScanner.current(); System.out.println(new String(CellUtil.cloneRow(cell)));//行建 System.out.println(new String(CellUtil.cloneFamily(cell))); System.out.println(new String(CellUtil.cloneQualifier(cell))); System.out.println(new String(CellUtil.cloneValue(cell))); } } @After public void after() throws IOException { HBaseUtils.close(table); } }
大数据
2020-03-27 18:24:00
每个有状态且存在复杂状态转换的对象都包含一个状态机
例如org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl 代表一个mr job,其内部就包含一个状态机: public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, EventHandler { protected static final StateMachineFactory stateMachineFactory = new StateMachineFactory(JobStateInternal.NEW) .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT, JobEventType.JOB_INIT_FAILED, new InitFailedTransition()) //JobImpl 在NEW状态 下接收到 JOB_INIT_FAILED类型的事件后 进行InitFailedTransition转换, 转换之后 JobImpl的状态变为InitFailedTransition .addTransition(JobStateInternal.NEW, EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW), JobEventType.JOB_INIT, new InitTransition()) //JobImpl 在NEW状态 下接收到 JOB_INIT类型的事件后 进行InitTransition转换, 转换之后 JobImpl的状态可能为INITED、NEW 二者中的一个 //略 .installTopology(); //JobImpl 持有的状态机 private final StateMachine stateMachine; //构造函数 public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,...) { //略 stateMachine = stateMachineFactory.make(this); //将stateMachineFactory添加的各种Transition 生成状态机 //略 } }

状态机构建过程
状态机构建过程就是将之前stateMachineFactory添加的各种Transition 生成状态机,最核心的就是生成stateMachineTable。 Map>> stateMachineTable = new EnumMap>>();
stateMachineTable 维护了状态之间的转化关系: 一个状态可以转成成哪些状态 一个状态可以接受哪些类型的事件 一个状态接受到事件之后做何种处理(Transition)
状态转换过程
状态转换过程就是事件处理过程
//step1: handler 处理事件 //org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl public void handle(JobEvent event) { //略 getStateMachine().doTransition(event.getType(), event); //略 } //step2: //org.apache.hadoop.yarn.state.StateMachineFactory.InternalStateMachine public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event) throws InvalidStateTransitonException { currentState = StateMachineFactory.this.doTransition (operand, currentState, eventType, event); return currentState; } //step3: //org.apache.hadoop.yarn.state.StateMachineFactory private STATE doTransition (OPERAND operand, STATE oldState, EVENTTYPE eventType, EVENT event) throws InvalidStateTransitonException { Map> transitionMap = stateMachineTable.get(oldState); //获取当前状态下可以相应的事件 if (transitionMap != null) { Transition transition = transitionMap.get(eventType); //根据当前事件获取的转换 if (transition != null) { return transition.doTransition(operand, oldState, event, eventType); //执行转换 } }

大数据
2020-03-27 17:44:00
基本概念(Cluster Analysis) “物以类聚” 分析方法 系统聚类 快速聚类 类型 Q型聚类:对 样品 的聚类 R型聚类:对 变量 的聚类 聚类统计量 距离 欧氏距离 马氏距离 兰式距离 相关系数 距离矩阵 相关矩阵 距离矩阵计算——dist(),cor() 系统聚类法 基本思想:先将样品分成类,每个样品为一类,然后每次将具有最小距离的两类合并,合并后重新计算类与类之间的距离,直到所有样品归为一类为止 类间距离 的计算方法 最短距离法(single) 最长距离法(complete) 中间距离法(median) 类平均法(avera) 重心法(centroid) 离差平方和法(Ward) 通用公式 hclust(D,method=)
快速聚类法kmeans 概念:基本思想是将每一个样品分配给最近中心(均值)的类中 原理:n个对象分k类,类内 相似度 高,类间相似度低 相似度:类中对象的均值mean来计算 kmeans(x,centers) 不足:只有在类均值被定义的情况下才能使用,对孤立点、噪声影响敏感 knn,kmed,中位数 变量变换 平移变换 极差变换 标准差变换 主成分 对数 x1=c(2.5,3.0,6.0,6.6,7.2,4.0,4.7,4.5,5.5) x2=c(2.1,2.5,2.5,1.5,3.0,6.4,5.6,7.6,6.9) X=data.frame(x1,x2) D=dist(X,diag = TRUE,upper = FALSE) hc=hclust(D,"complete") hc names(hc) hc$merge hc$height #系统聚类图 plot(hc) rect.hclust(hc,3) #显示分类步骤 cutree(hc,9:1) #系统聚类分析步骤 library(mvstats) d7.2=read.table('clipboard',header = T) X7.2=msa.X(d7.2) plot(d7.2,gap=0) D=dist(d7.2) D H=H.clust(d7.2,"euclidean","single",plot=T)#最短距离法 H.clust(d7.2,"euclidean","complete",plot=T)#最长距离法 H.clust(d7.2,"euclidean","median",plot=T)#中间距离法 H.clust(d7.2,"euclidean","average",plot=T)#类平均法 H.clust(d7.2,"euclidean","centroid",plot=T)#重心法 H.clust(d7.2,"euclidean","ward",plot=T)#ward cutree(H,3) #快速聚类法 set.seed(123) x1=matrix(rnorm(1000,0,0.3)) x2=matrix(rnorm(1000,1,0.3)) X=rbind(x1,x2) H=hclust(dist(X)) plot(H) km=kmeans(X,2) km$cluster plot(X,pch=km$cluster) #10变量2000样品 set.seed(123) x1=matrix(rnorm(10000,0,0.3),ncol = 10) x2=matrix(rnorm(10000,1,0.3),ncol = 10) Y=rbind(x1,x2) km=kmeans(Y,2) km$cluster plot(Y,pch=km$cluster) km
参考资料: https://next.xuetangx.com/course/JNU07011000851/151569
大数据
2020-03-27 11:29:00
1.hbase是一个分布式系统,需要依赖HDFS作为存储介质,并依靠zookeeper来完成监控主从节点上下线。
2.安装准备如下: 1.HBase角色 HMaster(主节点,一台Active,一台StandBy) HReginServer(数据节点,多台) 2.依赖 HDFS集群、zookeeper集群【启动】
3.上传安装包解压 tar -zxvf hbase-1.2.1.bin.tar.gz -C apps/
4.进入conf目录修改配置文件如下 1.vi hbase-env.sh export JAVA_HOME=/usr/local/java/jdk1.8.0_231 export HBASE_MANAGES_ZK=false #自己的zookeeper,不启动它内置的 2.vi hbase-site.xml hbase.rootdir hdfs://hadoop01:9000/hbase hbase.cluster.distributed true hbase.zookeeper.quorum hadoop01:2181,hadoop02:2181,hadoop03:2181 hbase.master.info.port 60010 3.vi regionservers hadoop01 hadoop02 hadoop03
5.拷贝到其他机器 scp -r hbase-1.2.1/ hadoop02:$PWD scp -r hbase-1.2.1/ hadoop03:$PWD
6.启动前看看hdfs、zookeeper是否启动 # 1.一台一台启动 bin/hbase-daemon.sh start master bin/hbase-daemon.sh start regionserver # 2.全部启动,统一时间记得。要不同步会出现错误。 bin/start-hbase.sh # 3.要想启用备用的master,只需要在另外一台机器执行 bin/hbase-daemon.sh start master # 3.hadoop01:16010可以查看web页面
7.命令行客户端 体验即可,一般要用java程序访问 cd bin/hbase bin/hbase shell >status >建表等等。。。
8.java客户端
大数据
2020-03-27 11:24:00
Hive只在一个节点上安装即可
1.上传tar包
2.解压 tar -zxvf hive-1.2.1.tar.gz -C /apps/
3.安装mysql数据库(切换到root用户)(装在哪里没有限制,只有能联通hadoop集群的节点)
4.配置hive (a)配置HIVE_HOME环境变量 vi conf/hive-env.sh 配置其中的$hadoop_home 1.配置hive环境变量,编辑 vi /etc/profile #set hive env export HIVE_HOME=/root/apps/hive-1.2.1 export PATH=${HIVE_HOME}/bin:$PATH source /etc/profile 2.配置hadoop环境变量【安装hadoop时候已配置】
cd apps/hive-1.2.1/conf
4.1 cp hive-env.sh.template hive-env.sh
vi hive-env.sh 将以下内容写入到hive-env.sh文件中 export JAVA_HOME=/usr/local/java-1.8.231 export HADOOP_HOME=/root/apps/hadoop-2.6.5 export HIVE_HOME=/root/apps/hive-1.2.1
4.2 配置元数据
vi hive-site.xml
添加如下内容: javax.jdo.option.ConnectionURL jdbc:mysql://192.168.52.200:3306/hive?createDatabaseIfNotExist=true JDBC connect string for a JDBC metastore javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver Driver class name for a JDBC metastore javax.jdo.option.ConnectionUserName root username to use against metastore database javax.jdo.option.ConnectionPassword 123456 password to use against metastore database
5.安装hive和mysq完成后,将mysql的连接jar包拷贝到hive安装目录的/lib目录下 如果出现没有权限的问题,在mysql授权(在安装mysql的机器上执行) mysql -uroot -p #(执行下面的语句 *.*:所有库下的所有表 %:任何IP地址或主机都可以连接) GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'mysql' WITH GRANT OPTION; FLUSH PRIVILEGES; grant all privileges on *.* to root@"192.168.52.200" identified by "mysql" with grant option; FLUSH PRIVILEGES;
【注意】:需要配置hadoop和hive的环境变量,在启动hive之前需要启动hadoop的hdfs、yarn
6. Jline包版本不一致的问题,需要拷贝hive的lib目录中jline.2.12.jar的jar包替换掉hadoop中的
/apps/hadoop-2.6.4/share/hadoop/yarn/lib/jline-0.9.94.jar
7.1 启动hive程序 bin/hive
【注意】移植很强悍,只需要需改好的hive-1.2.1拷贝到其他机器,不许做任何修改,直接可启动用,如: scp -r hive-1.2.1/ hadoop02:/root/apps/
7.2 显示当前使用的库,和开启字段名称 set hive.cli.print.current.db=true; set hive.cli.print.header=true;
8.0如果启动hive服务、客户端呢??? # 启动服务hiveserver2 10000端口 bin/hiveserver2 # 这里没有后台运行 nohup bin/hiveserver2 1>/dev/null 2>&1 & # 后台运行服 # 开启beeline客户端 bin/beeline beeline>!connect jdbc:hive2://hadoop1:10000 用户名root 直接回车 客户端很好看!!!!! # 退出客户端 beeline>!quit
9.0 建内部表(默认是内部表default) 字段之间 , 分开的 create table trade_detail(id bigint, account string, income double, expenses double, time string) row format delimited fields terminated by ',';
9.1 建外部表
建外部表,任意目录,可以非/usr/warehouse/下,外部表删除后,hdfs中的数据目录不会删除。 create external table td_ext(id bigint, account string, income double, expenses double, time string) row format delimited fields terminated by ',' location '/lod/20190202/';
10. 创建分区表
10.1 普通表和分区表区别:有大量数据增加的需要建分区表
create table log (id bigint, url string) partitioned by (daytime string) row format delimited fields terminated by ',';
10.2 **导入本机数据到分区表 hdfs的hive仓库中 1.手动上传 2.hive命令,再次执行的话会追加数据 hive>load data local inpath '/root/log1.log/' log partition(daytime='20190904'); # 指定分区20190904
10.3 分区表加载数据 select * from log where daytime='20190904'
大数据
2020-03-27 11:19:00
判别分析(Discriminat Analysis) 概念:多元统计分析中判别样本所属类型的一种统计分析方法 方法:在已知的分类之下,对新的样本,利用此方法选定判别标准,以判断将该新样品置于哪个类中 种类 确定性判别:Fisher型 线性型 距离型 非线性型 概率性判别:Bayes型 概率型 损失型
线性判别函数(linear discriminatory function) 求Fisher线性判别函数 计算判别界值 建立判别标准
两总体距离判别 马氏距离(统计距离)
协方差矩阵 等方差阵(等价于Fisher判别)——直线判别 异方差阵——曲线判别(qda) 多总体距离判别 协方差矩阵相同——线性判别 协方差矩阵不相同——非线性判别
Bayes判别准则 概率判别 损失判别 概率最大、损失最小 正态总体的Bayes判别 Bayes判别函数求解 先验概率 密度函数 协方差矩阵 后验概率计算 #建立Fisher线性判别函数 d6.1=read.table('clipboard',header = T) d#基本统计分析 boxplot(x1~G,d6.1) t.test(x1~G,d6.1) boxplot(x2~G,d6.1) t.test(x2~G,d6.1) #Logistic模型分析 summary(glm(G-1~x1+x2,family = binomial,data=d6.1)) #Fisher判别分析 attach(d6.1) plot(d6.1$x1,d6.1$x2) text(d6.1$x1,d6.1$x2,adj=-0.5) library(MASS) ld=lda(G~x1+x2,d6.1) ld #判断 lp=predict(ld) G1=lp$class data.frame(G,G1) tab1=table(G,G1) tab1 sum(diag(prop.table(tab1))) #非线性判别模型 qd=qda(G~d6.1$x1+d6.1$x2) qp=predict(qd) G2=qp$class data.frame(G,G1,G2) #多类距离判别 d6.3=read.table('clipboard',header = T) #线性判别(等方差) ld3=lda(d6.3$G2~d6.3$Q+d6.3$C+d6.3$P) ld3 #异方差,二次判别 ld4=qda(G~d6.1$x1+d6.1$x2) ld4 #Bayes判别 ld41=lda(d6.3$G2~d6.3$Q+d6.3$C+d6.3$P,prior=c(1,1,1)/3) ld41 #先验概率不等 ld42=lda(d6.3$G2~d6.3$Q+d6.3$C+d6.3$P,prior=c(5,8,7)/20) ld42 #概率结果 predict(ld41,data.frame(Q=8,C=7.5,P=65)) predict(ld42,data.frame(Q=8,C=7.5,P=65))
参考资料: https://next.xuetangx.com/course/JNU07011000851/151569 ​​​​​​​
大数据
2020-03-27 10:05:00
去年,阿里云发布了本地 IDE 插件 Cloud Toolkit,仅 IntelliJ IDEA 一个平台,就有 15 万以上的开发者进行了下载,体验了一键部署带来的开发便利。时隔一年的今天,阿里云正式发布了 Visual Studio Code 版本,全面覆盖前端开发者,帮助前端实现一键打包部署,让开发提速 8 倍。
Cloud Toolkit 是免费的本地 IDE 插件,帮助开发者更高效地开发、测试、诊断并部署应用。通过插件,可以将本地应用一键部署到任意服务器,甚至云端(ECS、EDAS、ACK、ACR 和 小程序云 等);并且还内置了 Arthas 诊断、Dubbo工具、Terminal 终端、文件上传、函数计算 和 MySQL 执行器等工具。获查看详情: https://cn.aliyun.com/product/cloudtoolkit
VSCode 版本的插件,目前能做到什么?
安装插件之后,开发者可以立即体验以下任何一个功能: 将本地 Visual Studio Code 中的代码,一键打包、部署到任意的阿里云 ECS 服务器; 将本地 Visual Studio Code 中的代码,一键打包、部署到任意的远程服务器; 向任意的阿里云 ECS 或者 任意远程服务器 发送并执行指定的命令(即将发布); VSCode 版本的插件正不断更新迭代中,不久将会有更多强大的功能上线,请期待!或向我们提出需求!
如何下载插件?
开发者可以通过在线安装或离线安装方式来下载插件: 在线安装:从 Visual Studio Code 的 Marketplace 进行安装,访问插件页面: https://marketplace.visualstudio.com/items?itemName=alibabacloud-cloudtoolkit.toolkit-vscode&ssr=false#overview 离线安装:在插件的群里(文末扫码进群)获得离线安装包,进行离线安装;
阿里云 Cloud Toolkit 和其他任何插件安装方式一致,因此不做赘述,下面为大家详细介绍插件功能。
一键打包部署
一、添加服务器
如下图所示,在 Visual Studio Code 左侧,点击阿里云图标,在出现的菜单中,点击 Alibaba Cloud Toolkit - Host View 打开机器视图界面。
然后点击右上角 Add Host 按钮,出现添加机器界面,如下图。按照表单提示,填入对应的机器 IP,端口、用户名和密码即可。
二、部署
点击 Run Configurations - Deployments - Deploy to Host 创建一个新的部署配置,配置界面如下:
在 Deploy to Host 对话框设置部署参数,然后单击 Apply,即可保存。 部署参数说明: Name:未配置定义一个名字,以便更容易区分多个配置; File:选择打包方式; Project:选择待部署工程的根目录; Build Output Directory:打包之后的 Output 目录; Webpack Configuration:Webpack 配置; Target Host:部署的远程目标服务器; Target Directory:远程目标服务器上的指定部署目录; After deploy:输入应用启动命令,如 sh /root/restart.sh,表示在完成应用包的部署后,需要执行的命令 —— 对于 Java 程序而言,通常是一句 Tomcat 的启动命令。
查看更多:https://yq.aliyun.com/articles/743855?utm_content=g_1000103370
上云就看云栖号:更多云资讯,上云案例,最佳实践,产品入门,访问:https://yqh.aliyun.com/
大数据
2020-02-11 20:57:00
安装kudu-python包参照官方文档:
https://docs.cloudera.com/documentation/enterprise/6/6.3/topics/kudu_development.html
To install the Kudu Python client: Update all the packages on your system using the following command: sudo yum -y update Install the extra packages for the Enterprise Linux distribution: sudo yum -y install epel-release Install the Python package manager: sudo yum -y install python-pip Verify the version of the PIP manager that you just installed: pip --version Install Cython: sudo pip install cython Download the following files: wget http://username:password@archive.cloudera.com/p/cdh6/6.3.3/redhat7/yum/RPMS/x86_64/kudu-1.10.0+cdh6.3.x-1822539.x86_64.rpm wget http://username:password@archive.cloudera.com/p/cdh6/6.3.3/redhat7/yum/RPMS/x86_64/kudu-client-devel-1.10.0+cdh6.3.x-1822539.x86_64.rpm Install the kudu package from the local directory: sudo yum -y localinstall ./kudu-* Install the package used for developing Python extensions: sudo yum -y install python-devel Upgrade the setup tools: sudo pip install --upgrade pip setuptools Install the C++ compiler: sudo yum -y install gcc-c++ Install the Kudu-python client: sudo pip install kudu-python==1.10.0 Install kudu-python: sudo pip install kudu-python
上面,有个需要cloudera账户密码的wget地址。其实只要来这个地址,就可以不用登陆就能下载了:
https://archive.cloudera.com/cdh6/6.3.2/redhat7/yum/RPMS/x86_64/
实际上,要下载的包有三个,分别是:
kudu-1.10.0+cdh6.3.2-1605554.el7.x86_64.rpm 2019-11-12 13:43 161.00MB
kudu-client-devel-1.10.0+cdh6.3.2-1605554.el7.x86_64.rpm
kudu-client0-1.10.0+cdh6.3.2-1605554.el7.x86_64.rpm
2019-11-12 13:44
2019-11-12 13:43
57.00KB
4.00MB
就可以了。

我绕了一个弯路,按照官方文档装失败了。
然后就在usr/lib64下面,手动地添加了一个 libkudu_client.0.so 文件(软连接)
从我的CDH自动部署的Kudu项目里面找出来的。
https://community.cloudera.com/t5/Support-Questions/Using-kudu-with-Python/td-p/59185/
然后再使用pip或者setuptools工具,把kudu包装了上去。










大数据
2020-02-11 15:14:00
一文看懂pandas的透视表pivot_table
一、概述
1.1 什么是透视表?
透视表是一种可以对 数据动态排布并且分类汇总的表格格式。或许大多数人都在Excel使用过数据透视表,也体会到它的强大功能, 而在pandas中它被称作pivot_table。
1.2 为什么要使用pivot_table? 灵活性高,可以随意定制你的分析计算要求 脉络清晰易于理解数据 操作性强,报表神器
二、如何使用pivot_table
首先读取数据,数据集是火箭队当家球星James Harden某一赛季比赛数据作为数据集进行讲解。 数据地址 。
先看一下官方文档中pivot_table的函数体: pandas.pivot_table - pandas 0.21.0 documentation
pivot_table(data, values=None, index=None, columns=None,aggfunc='mean', fill_value=None, margins=False, dropna=True, margins_name='All')
pivot_table有四个最重要的参数 index、values、columns、aggfunc ,本文以这四个参数为中心讲解pivot操作是如何进行。
2.1 读取数据 import pandas as pd import numpy as np df = pd.read_csv( 'h:/James_Harden.csv',encoding= 'utf8') df.tail()
数据格式如下:
2.2 Index
每个pivot_table必须拥有一个index, 如果想查看哈登对阵每个队伍的得分,首先我们将对手设置为 index : pd.pivot_table(df,index=[ u'对手' ])
对手成为了第一层索引,还想看看对阵同一对手在不同主客场下的数据,试着将对手与胜负与主客场都设置为index,其实就变成为了两层索引 pd.pivot_table(df,index=[ u'对手' , u'主客场' ])

试着交换下它们的顺序,数据结果一样: pd.pivot_table(df,index=[ u'主客场' , u'对手' ])

看完上面几个操作, Index就是层次字段,要通过透视表获取什么信息就按照相应的顺序设置字段 ,所以在进行pivot之前你也需要足够了解你的数据。
2.3 Values
通过上面的操作,我们获取了james harden在对阵对手时的所有数据, 而Values可以对需要的计算数据进行筛选,如果我们只需要james harden在主客场和不同胜负情况下的得分、篮板与助攻三项数据: pd.pivot_table(df,index=[ u'主客场' , u'胜负' ],values=[ u'得分' , u'助攻' , u'篮板' ])

2.4 Aggfunc
aggfunc参数可以设置我们对数据聚合时进行的函数操作。
当我们未设置aggfunc时,它默认 aggfunc='mean' 计算均值。我们还想要获得james harden在主客场和不同胜负情况下的总得分、总篮板、总助攻时: pd.pivot_table(df,index=[ u'主客场' , u'胜负' ],values=[ u'得分' , u'助攻' , u'篮板' ],aggfunc=[np.sum,np.mean])

2.5 Columns
Columns类似Index可以设置列层次字段,它不是一个必要参数,作为一种分割数据的可选方式。 #fill_value填充空值,margins=True进行汇总 pd.pivot_table(df,index=[ u'主客场'],columns=[ u'对手'],values=[ u'得分'],aggfunc=[np.sum], fill_value= 0,margins= 1 )

现在我们已经把关键参数都介绍了一遍,下面是一个综合的例子: table=pd.pivot_table(df,index=[ u'对手' , u'胜负' ],columns=[ u'主客场' ],values=[ u'得分' , u'助攻' , u'篮板' ],aggfunc=[np.mean],fill_value= 0 )
结果如下:

aggfunc也可以使用dict类型,如果dict中的内容与values不匹配时,以dict中为准。
table=pd.pivot_table(df,index=[ u'对手', u'胜负'],columns=[ u'主客场'],values=[ u'得分', u'助攻', u'篮板'],aggfunc={ u'得分':np.mean, u'助攻':[min, max, np.mean]},fill_value= 0)
结果就是助攻求min,max和mean,得分求mean,而篮板没有显示。
大数据
2020-02-10 22:55:00
# default_exp china # 上面一行用于nbdev中声明本模块的名称。必须是notebook的第一个Cell的第一行。
china 描述:抗击新冠病毒(6)-# 全国总体情况分析。 功能:载入data/china.csv文件,进行绘图输出和分析。 模块:使用JupyterLab、Python、nbdev等完成。用到的Python模块包括: re,正则表达式解析。 json,JSON格式解析。 pandas,数据表格分析。 数据来源: http://www.nhc.gov.cn/xcs/yqtb/list_gzbd.shtml 源码-https://github.com/openthings/anti2020ncov 参考: JupyterLab-数据实验室 文学式编程-nbdev入门教程 Pandas快速入门 更多参考: https://my.oschina.net/u/2306127?q=pandas 抗击新冠病毒(1)-开源软件与数据项目 抗击新冠病毒(2)-基于Jupyter+nbdev的数据分析 抗击新冠病毒(3)-探索在线数据资源 抗击新冠病毒(4)-获取并保存在线数据 import pandas as pd import numpy as np import matplotlib.pyplot as plt from matplotlib.font_manager import * china = pd.read_csv("./data/china.csv") china = china.sort_values(by='日期')
趋势图绘制 # 绘制增加量趋势图 def draw(dfx): myfont = FontProperties(fname='/usr/share/fonts/truetype/arphic/ukai.ttc',size=24) fig=plt.figure(figsize=(48,12), dpi=250) p1=fig.add_subplot(1,1,1) p1.set_xticklabels(dfx['日期'], rotation=15, fontsize='small',fontproperties=myfont) #显示数据。 p1.plot(dfx['日期'],dfx['新增确诊'],color='red',linewidth=3,label='新增确诊') p1.plot(dfx['日期'],dfx['新增疑似'],color='#BB0000',linewidth=3,label='新增疑似') p1.plot(dfx['日期'],dfx['新增治愈'],color='green',linewidth=3,label='新增治愈') p1.plot(dfx['日期'],dfx['新增重症'],color='#660000',linewidth=3,label='新增重症') p1.bar(dfx['日期'],dfx['新增死亡'],color='black',label='新增死亡') plt.title(u'全国新增病例数量(NCP)-2020年01-02月',fontproperties=myfont) plt.legend(loc=0,ncol=1,prop=myfont) plt.grid(True) plt.gcf().autofmt_xdate() plt.show() # 绘制累计指标趋势图 def drawa(dfx): myfont = FontProperties(fname='/usr/share/fonts/truetype/arphic/ukai.ttc',size=24) fig=plt.figure(figsize=(48,12), dpi=250) p1=fig.add_subplot(1,1,1) p1.set_xticklabels(dfx['日期'], rotation=15, fontsize='small',fontproperties=myfont) #显示数据。 p1.plot(dfx['日期'],dfx['累计确诊'],color='red',linewidth=3,label='累计确诊') p1.plot(dfx['日期'],dfx['现有疑似'],color='#BB0000',linewidth=3,label='现有疑似') p1.plot(dfx['日期'],dfx['累计治愈'],color='green',linewidth=3,label='累计治愈') p1.bar(dfx['日期'],dfx['累计死亡'],color='black',label='累计死亡') plt.title(u'全国累计病例数量(NCP)-2020年01-02月',fontproperties=myfont) plt.legend(loc=0,ncol=1,prop=myfont) plt.grid(True) plt.gcf().autofmt_xdate() plt.show()
全国新增病例数量(NCP)-2020年01-02月 draw(china)
全国累计病例数量(NCP)-2020年01-02月 drawa(china)
大数据
2020-02-10 22:49:00
# default_exp province # 上面一行用于nbdev中声明本模块的名称。必须是notebook的第一个Cell的第一行。
province 描述:抗击新冠病毒(5)-# 使用pandas进行数据分析. 功能:载入data/china.csv文件,进行绘图输出和分析。 模块:使用JupyterLab、Python、nbdev等完成。用到的Python模块包括: re,正则表达式解析。 json,JSON格式解析。 pandas,数据表格分析。 源码-https://github.com/openthings/anti2020ncov 参考: JupyterLab-数据实验室 文学式编程-nbdev入门教程 Pandas快速入门 更多参考: https://my.oschina.net/u/2306127?q=pandas 抗击新冠病毒(1)-开源软件与数据项目 抗击新冠病毒(2)-基于Jupyter+nbdev的数据分析 抗击新冠病毒(3)-探索在线数据资源 抗击新冠病毒(4)-获取并保存在线数据 import pandas as pd import numpy as np import matplotlib.pyplot as plt from matplotlib.font_manager import * prov = pd.read_csv("./data/prov_20200209.csv") #prov #prov['省份'].values.tolist()
各省的病例情况图 #中文乱码问题,https://www.linuxidc.com/Linux/2019-03/157632.htm def draw(dfx): myfont = FontProperties(fname='/usr/share/fonts/truetype/arphic/ukai.ttc') fig=plt.figure(figsize=(48,12), dpi=250) p1=fig.add_subplot(1,1,1) p1.set_xticklabels(dfx['省份'], rotation=30, fontsize='small',fontproperties=myfont) #显示数据。 p1.plot(dfx['省份'],dfx['确诊'],color='red',linewidth=3,label='确诊') p1.plot(dfx['省份'],dfx['治愈'],color='green',linewidth=3,label='治愈') p1.bar(dfx['省份'],dfx['死亡'],color='black',label='死亡') plt.title(u'各省病例数量-2020/02/08',fontproperties=myfont) plt.legend(loc=0,ncol=1,prop=myfont) plt.grid(True) plt.gcf().autofmt_xdate() plt.show() # 绘制各省的病例情况图。 draw(prov)
分省区的地市病例情况图 city = pd.read_csv("./data/city_20200208.csv") #city # 绘制指定省份的地市病例情况图。 def drawc(province): dfx = city[city['省份']==province] #print(dfx) #查询Linux系统的可用字体:fc-list :lang=zh myfont = FontProperties(fname='/usr/share/fonts/truetype/arphic/ukai.ttc') fig = plt.figure(figsize=(48,6), dpi=250) p1 = fig.add_subplot(1,1,1) p1.set_xticklabels(dfx['城市'], rotation=30, fontsize='small',fontproperties=myfont) #显示数据。 p1.plot(dfx['城市'],dfx['确诊'],color='red',linewidth=3,label='确诊') p1.plot(dfx['城市'],dfx['治愈'],color='green',linewidth=3,label='治愈') p1.bar(dfx['城市'],dfx['死亡'],color='black',label='死亡') plt.title(province + u'各地市病例数量-2020/02/08',fontproperties=myfont) plt.legend(loc=0,ncol=1,prop=myfont) plt.grid(True) plt.gcf().autofmt_xdate() plt.show() return dfx # 绘制所有省的所有地市统计图。 ind = 1 for each in prov['省份']: print(ind, each) ind = ind + 1 try: result = drawc(each) except: print("ERROR!")
输出图形如下:
大数据
2020-02-10 19:21:00
# default_exp getdata # 上面一行用于nbdev中声明本模块的名称。必须是notebook的第一个Cell的第一行。
getdata 描述:抗击新冠病毒(4)-获取并保存在线数据。 功能:获取数据,转换为列表格式,保存到*.c s v文件,用于后续的分析和绘图输出。 模块:使用JupyterLab、Python、nbdev等完成。用到的Python模块包括: requests,访问web服务网站。 re,正则表达式解析。 json,JSON格式解析。 BeautifulSoup,HTML格式解析。 pprint,格式化输出。 pandas,数据表格分析。 源码-https://github.com/openthings/anti2020ncov 参考: JupyterLab-数据实验室 文学式编程-nbdev入门教程 抗击新冠病毒(1)-开源软件与数据项目 抗击新冠病毒(2)-基于Jupyter+nbdev的数据分析 抗击新冠病毒(3)-探索在线数据资源 #hide from nbdev.showdoc import * #export from bs4 import BeautifulSoup from parser import * #regex_parser import re import json import time import logging import datetime import requests import pprint
预设网络地址和创建请求头。 丁香园-新型冠状病毒肺炎疫情实时动态, https://ncov.dxy.cn/ncovh5/view/pneumonia?from=singlemessage&isappinstalled=0 中国疾控中心, http://www.chinacdc.cn , 分布图 http://2019ncov.chinacdc.cn/2019-nCoV/ 全国卫健委-新型冠状病毒肺炎疫情防控, http://www.nhc.gov.cn/xcs/xxgzbd/gzbd_index.shtml 全国卫健委-官方网站, http://www.nhc.gov.cn 世界卫生组织(WHO), https://www.who.int/zh # export # 使用丁香园的数据。 url = "https://ncov.dxy.cn/ncovh5/view/pneumonia?from=singlemessage&isappinstalled=0" headers = { 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.117 Safari/537.36' } #保存的文件名。 dateof = '20200207'
从Web服务获取网页,并解析为JSON格式。 Python之json使用 #export def getweb(): session = requests.session() session.headers.update(headers) r = session.get(url) soup = BeautifulSoup(r.content, 'lxml') #获取省市数据。 area_information = re.search(r'\[(.*)\]', str(soup.find('script', attrs={'id': 'getAreaStat'}))) area = json.loads(area_information.group(0)) return area
保存分省的数据到data/prov_日期.csv文件。 # export # 写入文件,分省数据。 def saveprovice(area): fprovince = "data/" + "prov_" + dateof + ".csv" fp = open(fprovince, "w") fp.write("省份,确诊,疑似,治愈,死亡\r") for a in area: fp.write(a['provinceName']+','+ \ str(a['confirmedCount'])+','+ \ str(a['suspectedCount'])+','+ \ str(a['curedCount'])+','+ \ str(a['deadCount'])+ '\r') fp.close() print("writed to "+ fprovince + "\r\n")
保存分市的数据到文件 data/city_日期.csv。 # export # 写入文件,分市数据。 def savecity(area): fcity = "data/" + "city_" + dateof + ".csv" fc = open(fcity, "w") fc.write("省份,城市,确诊,疑似,治愈,死亡\r") for p in area: cities = p['cities'] for c in cities: fc.write(p['provinceName']+','+ \ c['cityName']+','+ \ str(c['confirmedCount'])+','+ \ str(c['suspectedCount'])+','+ \ str(c['curedCount'])+','+ \ str(c['deadCount'])+'\r') fc.close() print("writed to "+ fcity + "\r\n")
查看保存的文件。
这里用到Notebook的魔法操作符,参考: IPython 6/Jupyter的magic操作符 IPython的Magics魔法操作符 !ls -l data 总用量 176 -rw-r--r-- 1 supermap supermap 1445 2月 9 22:49 china.csv -rw-r--r-- 1 supermap supermap 11840 2月 7 23:02 city_20200207.csv -rw-r--r-- 1 supermap supermap 12156 2月 8 18:39 city_20200208.csv -rw-r--r-- 1 supermap supermap 12169 2月 10 18:20 city_20200209.csv -rw-r--r-- 1 supermap supermap 126285 2月 9 15:09 IMG_7082.JPG -rw-r--r-- 1 supermap supermap 780 2月 7 23:02 prov_20200207.csv -rw-r--r-- 1 supermap supermap 784 2月 8 18:39 prov_20200208.csv -rw-r--r-- 1 supermap supermap 790 2月 10 18:20 prov_20200209.csv
nbdev 适用工具 # 将notebook转化为python的*.py代码,保存到项目名称的子目录中。 from nbdev.export import * notebook2script() Converted 00_digdata.ipynb. Converted 01_getdata.ipynb. Converted 10_charts.ipynb. Converted 10_china.ipynb. Converted index.ipynb.
大数据
2020-02-10 19:00:00
matplotlib是常用的绘图库,支持python及在Jupyter Notebook下使用,也支持最新的JupyterLab环境。这里介绍matplotlib的字体设置方法以及绘图线型、符号和颜色的设置。
1、中文字体 关于中文乱码问题, https://www.linuxidc.com/Linux/2019-03/157632.htm
Linux下查找中文字体,使用命令: fc-list :lang=zh
绘图函数参考下面: def draw(dfx): myfont = FontProperties(fname='/usr/share/fonts/truetype/arphic/ukai.ttc',size=24) fig=plt.figure(figsize=(48,12), dpi=250) p1=fig.add_subplot(1,1,1) p1.set_xticklabels(dfx['日期'], rotation=15, fontsize='small',fontproperties=myfont) #显示数据。 p1.plot(dfx['日期'],dfx['新增确诊'],color='red',linewidth=3,label='新增确诊') p1.bar(dfx['日期'],dfx['新增死亡'],color='black',label='新增死亡') plt.title(u'全国新增病例数量(NCP)-2020年01-02月',fontproperties=myfont) plt.legend(loc=0,ncol=1,prop=myfont) plt.grid(True) plt.gcf().autofmt_xdate() plt.show()
需要字体设置的主要有标题、图例和轴上的标签。先创建一个myfont对象,然后在title、legend、set_xticklabel放进去即可,注意legend的参数是prop。
2、图形绘制
这里给出个简单的使用matplotlib绘图例子,使用了颜色、线宽、线型符号等风格样式。 import matplotlib.pyplot as plt from matplotlib.font_manager import * x= range(100) y= [i**2 for i in x] plt.subplots(1, 1) plt.plot(x, y, linewidth = '1', label = 'Example', color='coral', linestyle=':', marker='|') plt.legend(loc='upper left') plt.show()
3、线型表
linestyle可选参数: '-' solid line style '--' dashed line style '-.' dash-dot line style ':' dotted line style
4、符号表 marker可选参数: '.' point marker ',' pixel marker 'o' circle marker 'v' triangle_down marker '^' triangle_up marker '<' triangle_left marker '>' triangle_right marker '1' tri_down marker '2' tri_up marker '3' tri_left marker '4' tri_right marker 's' square marker 'p' pentagon marker '*' star marker 'h' hexagon1 marker 'H' hexagon2 marker '+' plus marker 'x' x marker 'D' diamond marker 'd' thin_diamond marker '|' vline marker '_' hline marker
5、颜色表
color可用的颜色: cnames = { 'aliceblue': '#F0F8FF', 'antiquewhite': '#FAEBD7', 'aqua': '#00FFFF', 'aquamarine': '#7FFFD4', 'azure': '#F0FFFF', 'beige': '#F5F5DC', 'bisque': '#FFE4C4', 'black': '#000000', 'blanchedalmond': '#FFEBCD', 'blue': '#0000FF', 'blueviolet': '#8A2BE2', 'brown': '#A52A2A', 'burlywood': '#DEB887', 'cadetblue': '#5F9EA0', 'chartreuse': '#7FFF00', 'chocolate': '#D2691E', 'coral': '#FF7F50', 'cornflowerblue': '#6495ED', 'cornsilk': '#FFF8DC', 'crimson': '#DC143C', 'cyan': '#00FFFF', 'darkblue': '#00008B', 'darkcyan': '#008B8B', 'darkgoldenrod': '#B8860B', 'darkgray': '#A9A9A9', 'darkgreen': '#006400', 'darkkhaki': '#BDB76B', 'darkmagenta': '#8B008B', 'darkolivegreen': '#556B2F', 'darkorange': '#FF8C00', 'darkorchid': '#9932CC', 'darkred': '#8B0000', 'darksalmon': '#E9967A', 'darkseagreen': '#8FBC8F', 'darkslateblue': '#483D8B', 'darkslategray': '#2F4F4F', 'darkturquoise': '#00CED1', 'darkviolet': '#9400D3', 'deeppink': '#FF1493', 'deepskyblue': '#00BFFF', 'dimgray': '#696969', 'dodgerblue': '#1E90FF', 'firebrick': '#B22222', 'floralwhite': '#FFFAF0', 'forestgreen': '#228B22', 'fuchsia': '#FF00FF', 'gainsboro': '#DCDCDC', 'ghostwhite': '#F8F8FF', 'gold': '#FFD700', 'goldenrod': '#DAA520', 'gray': '#808080', 'green': '#008000', 'greenyellow': '#ADFF2F', 'honeydew': '#F0FFF0', 'hotpink': '#FF69B4', 'indianred': '#CD5C5C', 'indigo': '#4B0082', 'ivory': '#FFFFF0', 'khaki': '#F0E68C', 'lavender': '#E6E6FA', 'lavenderblush': '#FFF0F5', 'lawngreen': '#7CFC00', 'lemonchiffon': '#FFFACD', 'lightblue': '#ADD8E6', 'lightcoral': '#F08080', 'lightcyan': '#E0FFFF', 'lightgoldenrodyellow': '#FAFAD2', 'lightgreen': '#90EE90', 'lightgray': '#D3D3D3', 'lightpink': '#FFB6C1', 'lightsalmon': '#FFA07A', 'lightseagreen': '#20B2AA', 'lightskyblue': '#87CEFA', 'lightslategray': '#778899', 'lightsteelblue': '#B0C4DE', 'lightyellow': '#FFFFE0', 'lime': '#00FF00', 'limegreen': '#32CD32', 'linen': '#FAF0E6', 'magenta': '#FF00FF', 'maroon': '#800000', 'mediumaquamarine': '#66CDAA', 'mediumblue': '#0000CD', 'mediumorchid': '#BA55D3', 'mediumpurple': '#9370DB', 'mediumseagreen': '#3CB371', 'mediumslateblue': '#7B68EE', 'mediumspringgreen': '#00FA9A', 'mediumturquoise': '#48D1CC', 'mediumvioletred': '#C71585', 'midnightblue': '#191970', 'mintcream': '#F5FFFA', 'mistyrose': '#FFE4E1', 'moccasin': '#FFE4B5', 'navajowhite': '#FFDEAD', 'navy': '#000080', 'oldlace': '#FDF5E6', 'olive': '#808000', 'olivedrab': '#6B8E23', 'orange': '#FFA500', 'orangered': '#FF4500', 'orchid': '#DA70D6', 'palegoldenrod': '#EEE8AA', 'palegreen': '#98FB98', 'paleturquoise': '#AFEEEE', 'palevioletred': '#DB7093', 'papayawhip': '#FFEFD5', 'peachpuff': '#FFDAB9', 'peru': '#CD853F', 'pink': '#FFC0CB', 'plum': '#DDA0DD', 'powderblue': '#B0E0E6', 'purple': '#800080', 'red': '#FF0000', 'rosybrown': '#BC8F8F', 'royalblue': '#4169E1', 'saddlebrown': '#8B4513', 'salmon': '#FA8072', 'sandybrown': '#FAA460', 'seagreen': '#2E8B57', 'seashell': '#FFF5EE', 'sienna': '#A0522D', 'silver': '#C0C0C0', 'skyblue': '#87CEEB', 'slateblue': '#6A5ACD', 'slategray': '#708090', 'snow': '#FFFAFA', 'springgreen': '#00FF7F', 'steelblue': '#4682B4', 'tan': '#D2B48C', 'teal': '#008080', 'thistle': '#D8BFD8', 'tomato': '#FF6347', 'turquoise': '#40E0D0', 'violet': '#EE82EE', 'wheat': '#F5DEB3', 'white': '#FFFFFF', 'whitesmoke': '#F5F5F5', 'yellow': '#FFFF00', 'yellowgreen': '#9ACD32'}
颜色样本如下:
另外:
如果安装了seaborn扩展的话,在字典seaborn.xkcd_rgb中包含所有的xkcd crowdsourced color names。使用方法如下: plt.plot([1,2], lw=4, c=seaborn.xkcd_rgb['baby poop green'])
所有颜色表如下:

大数据
2020-02-10 15:57:00
前言
首先介绍下在本文出现的几个比较重要的概念: 函数计算(Function Compute) : 函数计算 是一个事件驱动的服务,通过函数计算,用户无需管理服务器等运行情况,只需编写代码并上传。函数计算准备计算资源,并以弹性伸缩的方式运行用户代码,而用户只需根据实际代码运行所消耗的资源进行付费。函数计算更多信息 参考 。
Fun : Fun 是一个用于支持 Serverless 应用部署的工具,能帮助您便捷地管理函数计算、API 网关、日志服务等资源。它通过一个资源配置文件(template.yml),协助您进行开发、构建、部署操作。Fun 的更多文档 参考 。
备注: 本文介绍的技巧需要 Fun 版本大于等于 3.5.0。
依赖工具
本项目是在 MacOS 下开发的,涉及到的工具是平台无关的,对于 Linux 和 Windows 桌面系统应该也同样适用。在开始本例之前请确保如下工具已经正确的安装,更新到最新版本,并进行正确的配置。 Docker Fun
Fun 工具依赖于 docker 来模拟本地环境。
对于 MacOS 用户可以使用 homebrew 进行安装: brew cask install docker brew tap vangie/formula brew install fun
Windows 和 Linux 用户安装请参考: https://github.com/aliyun/fun/blob/master/docs/usage/installation.md https://github.com/aliyun/fcli/releases
安装好后,记得先执行 fun config 初始化一下配置。
初始化
使用 fun init 命令可以快捷的将本模板项目初始化到本地。 fun init vangie/puppeteer-example
安装依赖 fun install
fun install 会执行 Funfile 文件里的指令,依次执行如下任务: 安装 chrome headless 二进制文件; 安装 puppeteer 依赖的 apt 包; 安装 npm 依赖。
部署
同步大文件到 nas 盘: fun nas sync
部署代码: $ fun deploy using template: template.yml using region: cn-hangzhou using accountId: *********** 3743 using accessKeyId: ***********Ptgk using timeout: 600 Waiting for service puppeteer to be deployed... make sure role 'aliyunfcgeneratedrole-cn-hangzhou-puppeteer' is exist role 'aliyunfcgeneratedrole-cn-hangzhou-puppeteer' is already exist attaching police 'AliyunECSNetworkInterfaceManagementAccess' to role: aliyunfcgeneratedrole-cn-hangzhou-puppeteer attached police 'AliyunECSNetworkInterfaceManagementAccess' to role: aliyunfcgeneratedrole-cn-hangzhou-puppeteer using 'VpcConfig: Auto' , Fun will try to generate related vpc resources automatically vpc already generated, vpcId is : vpc-bp1wv9al02opqahkizmvr vswitch already generated, vswitchId is : vsw-bp1kablus0jrcdeth8v35 security group already generated, security group is : sg-bp1h2swzeb5vgjfu6gpo generated auto VpcConfig done: { "vpcId" : "vpc-bp1wv9al02opqahkizmvr" , "vswitchIds" :[ "vsw-bp1kablus0jrcdeth8v35" ], "securityGroupId" : "sg-bp1h2swzeb5vgjfu6gpo" } using 'NasConfig: Auto' , Fun will try to generate related nas file system automatically nas file system already generated, fileSystemId is : 0825 a4a395 nas file system mount target is already created, mountTargetDomain is : 0825 a4a395-rrf16.cn-hangzhou.nas.aliyuncs.com generated auto NasConfig done: { "UserId" : 10003 , "GroupId" : 10003 , "MountPoints" :[{ "ServerAddr" : "0825a4a395-rrf16.cn-hangzhou.nas.aliyuncs.com:/puppeteer" , "MountDir" : "/mnt/auto" }]} Checking if nas directories /puppeteer exists, if not, it will be created automatically Checking nas directories done [ "/puppeteer" ] Waiting for function html2png to be deployed... Waiting for packaging function html2png code... The function html2png has been packaged. A total of 7 files files were compressed and the final size was 2.56 KB Waiting for HTTP trigger httpTrigger to be deployed... triggerName: httpTrigger methods: [ 'GET' ] url: https: //xxxxxx.cn-hangzhou.fc.aliyuncs.com/2016-08-15/proxy/puppeteer/html2png/ Http Trigger will forcefully add a 'Content-Disposition: attachment' field to the response header, which cannot be overwritten and will cause the response to be downloaded as an attachment in the browser. This issue can be avoided by using CustomDomain. trigger httpTrigger deploy success function html2png deploy success service puppeteer deploy success ===================================== Tips for nas resources ================================================== Fun has detected the .nas.yml file in your working directory, which contains the local directory: /Users/vangie/Workspace/puppeteer-example/{{ projectName }}/.fun/root /Users/vangie/Workspace/puppeteer-example/{{ projectName }}/node_modules The above directories will be automatically ignored when 'fun deploy' . Any content of the above directories changes,you need to use 'fun nas sync' to sync local resources to remote. ===============================================================================================================
验证 curl https://xxxxxx.cn-hangzhou.fc.aliyuncs.com/2016-08-15/proxy/puppeteer/html2png/ > screenshot.png
如果不传递查询参数,默认会截取阿里云的首页。
如果想换一个网址,可以使用如下命令格式: curl https://xxxxxx.cn-hangzhou.fc.aliyuncs.com/2016-08-15/proxy/puppeteer/html2png/?url=http://www.alibaba.com > screenshot.png
调试
如果需要在本地调试代码,可以使用如下命令: $ fun local start using template : template.yml HttpTrigger httpTrigger of puppeteer/html2png was registered url : http ://localhost: 8000 / 2016 -08 -15 /proxy/puppeteer/html2png methods: [ 'GET' ] authType: ANONYMOUS function compute app listening on port 8000 !
浏览器打开 http://localhost:8000/2016-08-15/proxy/puppeteer/html2png 即可。

查看更多:https://yq.aliyun.com/articles/743644?utm_content=g_1000103099
上云就看云栖号:更多云资讯,上云案例,最佳实践,产品入门,访问:https://yqh.aliyun.com/
大数据
2020-02-10 15:55:00
这是一个包含了 函数计算 每种 Runtime 结合 HTTP Trigger 实现文件上传和文件下载的示例集。每个示例包括: 一个公共 HTML 页面,该页面有一个文件选择框和上传按钮,会列出已经上传的文件,点击某个已上传的文件可以把文件下载下来; 支持文件上传、下载和列举的函数。
我们知道不同语言在处理 HTTP 协议上传下载时都有很多中方法和社区库,特别是结合函数计算的场景,开发人员往往需要耗费不少精力去学习和尝试。本示例集编撰的目的就是节省开发者甄别的精力和时间,为每种语言提供一种有效且符合社区最佳实践的方法,可以拿来即用。
当前已支持的 Runtime 包括: nodejs python php java
计划支持的 Runtime 包括: dotnetcore
不打算支持的 Runtime 包括: custom
使用限制
由于函数计算对于 HTTP 的 Request 和 Response 的 Body 大小限制均为 6M,所以该示例集只适用于借助函数计算上传和下载文件小于 6M 的场景。对于大于 6M 的情况,可以考虑如下方法: 分片上传 ,把文件切分成小块,上传以后再拼接起来; 借助于 OSS ,将文件先上传 OSS,函数从 OSS 上下载文件,处理完以后回传 OSS; 借助于 NAS ,将大文件放在 NAS 网盘上,函数可以像读写普通文件系统一样访问 NAS 网盘的文件。
快速开始
安装依赖
在开始之前请确保开发环境已经安装了如下工具: docker funcraft git make
构建并启动函数
克隆代码: git clone https://github.com/vangie/ fc -file-transfer
本地启动函数: $ make start ... HttpTrigger httpTrigger of file -transfer/nodejs was registered url : http ://localhost: 8000 / 2016 -08 -15 /proxy/ file -transfer/nodejs methods: [ 'GET' , 'POST' ] authType: ANONYMOUS HttpTrigger httpTrigger of file -transfer/python was registered url : http ://localhost: 8000 / 2016 -08 -15 /proxy/ file -transfer/python methods: [ 'GET' , 'POST' ] authType: ANONYMOUS HttpTrigger httpTrigger of file -transfer/ java was registered url : http ://localhost: 8000 / 2016 -08 -15 /proxy/ file -transfer/ java methods: [ 'GET' , 'POST' ] authType: ANONYMOUS HttpTrigger httpTrigger of file -transfer/php was registered url : http ://localhost: 8000 / 2016 -08 -15 /proxy/ file -transfer/php methods: [ 'GET' , 'POST' ] authType: ANONYMOUS function compute app listening on port 8000 !
make start 命令会调用 Makefile 文件中的指令,通过 fun local 在本地的 8000 端口开放 HTTP 服务,控制台会打印出每个 HTTP Trigger 的 URL 、支持的 HTTP 方法,以及认证方式。
效果演示
上面四个 URL 地址随便选一个在浏览器中打开示例页面。
接口说明
所有示例都实现了下述四个 HTTP 接口: GET / 返回文件上传 Form 的 HTML 页面 GET /list 以 JSON 数组形式返回文件列表 POST /upload 以 multipart/form-data 格式上传文件 fileContent 作为文件字段 fileName 作为文件名字段 GET /download?filename=xxx 以 application/octet-stream 格式返回文件内容。
此外为了能正确的计算相对路径,在访问根路径时如果不是以 / 结尾,都会触发一个 301 跳转,在 URL 末尾加上一个 / 。
不同语言的示例代码 nodejs python php java
已知问题 文件大小 限制 fun local 实现存在已知问题,上传过大的文件会自动退出,未来的版本会修复。 部署到线上需要绑定 自定义域名 才能使用,否则 HTML 文件在浏览器中会被 强制下载 而不是直接渲染。

查看更多:https://yq.aliyun.com/articles/743642?utm_content=g_1000103098
上云就看云栖号:更多云资讯,上云案例,最佳实践,产品入门,访问:https://yqh.aliyun.com/
大数据
2020-02-10 15:42:00
导读: 容器存储是 Kubernetes 系统中提供数据持久化的基础组件,是实现有状态服务的重要保证。Kubernetes 默认提供了主流的存储卷接入方案(In-Tree),同时也提供了插件机制(Out-Of-Tree),允许其他类型的存储服务接入 Kubernetes 系统服务。本文将从 Kubernetes 存储架构、存储插件原理、实现等方面进行讲解,希望大家有所收获。
一、Kubernetes 存储体系架构
引例: 在 Kubernetes 中挂载一个 Volume
首先以一个 Volume 的挂载例子来作为引入。
如下图所示,左边的 YAML 模板定义了一个 StatefulSet 的一个应用,其中定义了一个名为 disk-pvc 的 volume,挂载到 Pod 内部的目录是 /data。disk-pvc 是一个 PVC 类型的数据卷,其中定义了一个 storageClassName。
因此这个模板是一个典型的动态存储的模板。右图是数据卷挂载的过程,主要分为 6 步:
第一步 :用户创建一个包含 PVC的 Pod; 第二步 :PV Controller 会不断观察 ApiServer,如果它发现一个 PVC 已经创建完毕但仍然是未绑定的状态,它就会试图把一个 PV 和 PVC 绑定;
PV Controller 首先会在集群内部找到一个适合的 PV 进行绑定,如果未找到相应的 PV,就调用 Volume Plugin 去做 Provision。Provision 就是从远端上一个具体的存储介质创建一个 Volume,并且在集群中创建一个 PV 对象,然后将此 PV 和 PVC 进行绑定; 第三步 :通过 Scheduler 完成一个调度功能;
我们知道,当一个 Pod 运行的时候,需要选择一个 Node,这个节点的选择就是由 Scheduler 来完成的。Scheduler 进行调度的时候会有多个参考量,比如 Pod 内部所定义的 nodeSelector、nodeAffinity 这些定义以及 Volume 中所定义的一些标签等。
我们可以在数据卷中添加一些标签,这样使用这个 pv 的 Pod 就会由于标签的限制,被调度器调度到期望的节点上。
第四步 :如果有一个 Pod 调度到某个节点之后,它所定义的 PV 还没有被挂载(Attach),此时 AD Controller 就会调用 VolumePlugin,把远端的 Volume 挂载到目标节点中的设备上(如:/dev/vdb); 第五步: 当 Volum Manager 发现一个 Pod 调度到自己的节点上并且 Volume 已经完成了挂载,它就会执行 mount 操作,将本地设备(也就是刚才得到的 /dev/vdb)挂载到 Pod 在节点上的一个子目录中。同时它也可能会做一些像格式化、是否挂载到 GlobalPath 等这样的附加操作。 第六步 :绑定操作,就是将已经挂载到本地的 Volume 映射到容器中。
Kubernetes 的存储架构
接下来,我们一起看一下 Kubernetes 的存储架构。
PV Controller : 负责 PV/PVC 的绑定、生命周期管理,并根据需求进行数据卷的 Provision/Delete 操作; AD Controller :负责存储设备的 Attach/Detach 操作,将设备挂载到目标节点; Volume Manager :管理卷的 Mount/Unmount 操作、卷设备的格式化以及挂载到一些公用目录上的操作; Volume Plugins :它主要是对上面所有挂载功能的实现;
PV Controller、AD Controller、Volume Manager 主要是进行操作的调用,而具体操作则是由 Volume Plugins 实现的。 Scheduler :实现对 Pod 的调度能力,会根据一些存储相关的的定义去做一些存储相关的调度;
接下来,我们分别介绍上面这几部分的功能。
PV Controller
首先我们先来回顾一下几个基本概念: Persistent Volume (PV) : 持久化存储卷,详细定义了预挂载存储空间的各项参数;
例如,我们去挂载一个远端的 NAS 的时候,这个 NAS 的具体参数就要定义在 PV 中。一个 PV 是没有 NameSpace 限制的,它一般由 Admin 来创建与维护; Persistent Volume Claim (PVC) :持久化存储声明;
它是用户所使用的存储接口,对存储细节无感知,主要是定义一些基本存储的 Size、AccessMode 参数在里面,并且它是属于某个 NameSpace 内部的。 StorageClass :存储类;
一个动态存储卷会按照 StorageClass 所定义的模板来创建一个 PV,其中定义了创建模板所需要的一些参数和创建 PV 的一个 Provisioner(就是由谁去创建的)。
PV Controller 的主要任务就是完成 PV、PVC 的生命周期管理,比如创建、删除 PV 对象,负责 PV、PVC 的状态迁移;另一个任务就是绑定 PVC 与 PV 对象,一个 PVC 必须和一个 PV 绑定后才能被应用使用,它们是一一绑定的,一个 PV 只能被一个 PVC 绑定,反之亦然。

接下来,我们看一下一个 PV 的状态迁移图。
创建好一个 PV 以后,我们就处于一个 Available 的状态,当一个 PVC 和一个 PV 绑定的时候,这个 PV 就进入了 Bound 的状态,此时如果我们把 PVC 删掉,Bound 状态的 PV 就会进入 Released 的状态。
一个 Released 状态的 PV 会根据自己定义的 ReclaimPolicy 字段来决定自己是进入一个 Available 的状态还是进入一个 Deleted 的状态。如果 ReclaimPolicy 定义的是 "recycle" 类型,它会进入一个 Available 状态,如果转变失败,就会进入 Failed 的状态。
相对而言,PVC 的状态迁移图就比较简单。
一个创建好的 PVC 会处于 Pending 状态,当一个 PVC 与 PV 绑定之后,PVC 就会进入 Bound 的状态,当一个 Bound 状态的 PVC 的 PV 被删掉之后,该 PVC 就会进入一个 Lost 的状态。对于一个 Lost 状态的 PVC,它的 PV 如果又被重新创建,并且重新与该 PVC 绑定之后,该 PVC 就会重新回到 Bound 状态。
下图是一个 PVC 去绑定 PV 时对 PV 筛选的一个流程图。就是说一个 PVC 去绑定一个 PV 的时候,应该选择一个什么样的 PV 进行绑定。
首先 它会检查 VolumeMode 这个标签,PV 与 PVC 的 VolumeMode 标签必须相匹配。VolumeMode 主要定义的是我们这个数据卷是文件系统 (FileSystem) 类型还是一个块 (Block) 类型; 第二个部分 是 LabelSelector。当 PVC 中定义了 LabelSelector 之后,我们就会选择那些有 Label 并且与 PVC 的 LabelSelector 相匹配的 PV 进行绑定; 第三个部分 是 StorageClassName 的检查。如果 PVC 中定义了一个 StorageClassName,则必须有此相同类名的 PV 才可以被筛选中。
这里再具体解释一下 StorageClassName 这个标签,该标签的目的就是说,当一个 PVC 找不到相应的 PV 时,我们就会用该标签所指定的 StorageClass 去做一个动态创建 PV 的操作,同时它也是一个绑定条件,当存在一个满足该条件的 PV 时,就会直接使用现有的 PV,而不再去动态创建。 第四个部分 是 AccessMode 检查。
AccessMode 就是平时我们在 PVC 中定义的如 "ReadWriteOnce"、"RearWriteMany" 这样的标签。该绑定条件就是要求 PVC 和 PV 必须有匹配的 AccessMode,即 PVC 所需求的 AccessMode 类型,PV 必须具有。 最后 一个部分是 Size 的检查。
一个 PVC 的 Size 必须小于等于 PV 的 Size,这是因为 PVC 是一个声明的 Volume,实际的 Volume 必须要大于等于声明的 Volume,才能进行绑定。
接下来,我们看一个 PV Controller 的一个实现。
PV Controller 中主要有两个实现逻辑:一个是 ClaimWorker;一个是 VolumeWorker。
ClaimWorker 实现的是 PVC 的状态迁移。
通过系统标签 "pv.kubernetes.io/bind-completed" 来标识一个 PVC 的状态。 如果该标签为 True,说明我们的 PVC 已经绑定完成,此时我们只需要去同步一些内部的状态; 如果该标签为 False,就说明我们的 PVC 处于未绑定状态。
这个时候就需要检查整个集群中的 PV 去进行筛选。通过 findBestMatch 就可以去筛选所有的 PV,也就是按照之前提到的五个绑定条件来进行筛选。如果筛选到 PV,就执行一个 Bound 操作,否则就去做一个 Provision 的操作,自己去创建一个 PV。
再看 VolumeWorker 的操作。它实现的则是 PV 的状态迁移。
通过 PV 中的 ClaimRef 标签来进行判断,如果该标签为空,就说明该 PV 是一个 Available 的状态,此时只需要做一个同步就可以了;如果该标签非空,这个值是 PVC 的一个值,我们就会去集群中查找对应的 PVC。如果存在该 PVC,就说明该 PV 处于一个 Bound 的状态,此时会做一些相应的状态同步;如果找不到该 PVC,就说明该 PV 处于一个绑定过的状态,相应的 PVC 已经被删掉了,这时 PV 就处于一个 Released 的状态。此时再根据 ReclaimPolicy 是否是 Delete 来决定是删掉还是只做一些状态的同步。

以上就是 PV Controller 的简要实现逻辑。
AD Controller
AD Controller 是 Attach/Detach Controller 的一个简称。
它有两个核心对象,即 DesiredStateofWorld 和 ActualStateOfWorld。 DesiredStateofWorld 是集群中预期要达到的数据卷的挂载状态; ActualStateOfWorld 则是集群内部实际存在的数据卷挂载状态。
它有两个核心逻辑,desiredStateOfWorldPopulator 和 Reconcile。 desiredStateOfWorldPopulator 主要是用来同步集群的一些数据以及 DSW、ASW 数据的更新,它会把集群里面,比如说我们创建一个新的 PVC、创建一个新的 Pod 的时候,我们会把这些数据的状态同步到 DSW 中; Reconcile 则会根据 DSW 和 ASW 对象的状态做状态同步。它会把 ASW 状态变成 DSW 状态,在这个状态的转变过程中,它会去执行 Attach、Detach 等操作。
下面这个表分别给出了 desiredStateOfWorld 以及 actualStateOfWorld 对象的一个具体例子。 desiredStateOfWorld 会对每一个 Worker 进行定义,包括 Worker 所包含的 Volume 以及一些试图挂载的信息; actualStateOfWorl 会把所有的 Volume 进行一次定义,包括每一个 Volume 期望挂载到哪个节点上、挂载的状态是什么样子的等等。
下图是 AD Controller 实现的逻辑框图。
从中我们可以看到,AD Controller 中有很多 Informer,Informer 会把集群中的 Pod 状态、PV 状态、Node 状态、PVC 状态同步到本地。
在初始化的时候会调用 populateDesireStateofWorld 以及 populateActualStateofWorld 将 desireStateofWorld、actualStateofWorld 两个对象进行初始化。
在执行的时候,通过 desiredStateOfWorldPopulator 进行数据同步,即把集群中的数据状态同步到 desireStateofWorld 中。reconciler 则通过轮询的方式把 actualStateofWorld 和 desireStateofWorld 这两个对象进行数据同步,在同步的时候,会通过调用 Volume Plugin 进行 attach 和 detach 操作,同时它也会调用 nodeStatusUpdater 对 Node 的状态进行更新。
以上就是 AD Controller 的简要实现逻辑。
Volume Manager
Volume Manager 实际上是 Kubelet 中一部分,是 Kubelet 中众多 Manager 的一个。它主要是用来做本节点 Volume 的 Attach/Detach/Mount/Unmount 操作。
它和 AD Controller 一样包含有 desireStateofWorld 以及 actualStateofWorld,同时还有一个 volumePluginManager 对象,主要进行节点上插件的管理。在核心逻辑上和 AD Controller 也类似,通过 desiredStateOfWorldPopulator 进行数据的同步以及通过 Reconciler 进行接口的调用。
这里我们需要讲一下 Attach/Detach 这两个操作:
之前我们提到 AD Controller 也会做 Attach/Detach 操作,所以到底是由谁来做呢?我们可以通过 "--enable-controller-attach-detach" 标签进行定义,如果它为 True,则由 AD Controller 来控制;若为 False,就由 Volume Manager 来做。
它是 Kubelet 的一个标签,只能定义某个节点的行为,所以如果假设一个有 10 个节点的集群,它有 5 个节点定义该标签为 False,说明这 5 个节点是由节点上的 Kubelet 来做挂载,而其它 5 个节点是由 AD Controller 来做挂载。
下图是 Volume Manager 实现逻辑图。
我们可以看到,最外层是一个循环,内部则是根据不同的对象,包括 desireStateofWorld, actualStateofWorld 的不同对象做一个轮询。
例如,对 actualStateofWorld 中的 MountedVolumes 对象做轮询,对其中的某一个 Volume,如果它同时存在于 desireStateofWorld,这就说明实际的和期望的 Volume 均是处于挂载状态,因此我们不会做任何处理。如果它不存在于 desireStateofWorld,说明期望状态中该 Volume 应该处于 Umounted 状态,就执行 UnmountVolume,将其状态转变为 desireStateofWorld 中相同的状态。
所以我们可以看到:实际上,该过程就是根据 desireStateofWorld 和 actualStateofWorld 的对比,再调用底层的接口来执行相应的操作,下面的 desireStateofWorld.UnmountVolumes 和 actualStateofWorld.AttachedVolumes 的操作也是同样的道理。
Volume Plugins
我们之前提到的 PV Controller、AD Controller 以及 Volume Manager 其实都是通过调用 Volume Plugin 提供的接口,比如 Provision、Delete、Attach、Detach 等去做一些 PV、PVC 的管理。而这些接口的具体实现逻辑是放在 VolumePlugin 中的
根据源码的位置可将 Volume Plugins 分为 In-Tree 和 Out-of-Tree 两类: In-Tree 表示源码是放在 Kubernetes 内部的,和 Kubernetes 一起发布、管理与迭代,缺点及时迭代速度慢、灵活性差; Out-of-Tree 类的 Volume Plugins 的代码独立于 Kubernetes,它是由存储商提供实现的,目前主要有 Flexvolume 和 CSI 两种实现机制,可以根据存储类型实现不同的存储插件。所以我们比较推崇 Out-of-Tree 这种实现逻辑。
从位置上我们可以看到,Volume Plugins 实际上就是 PV Controller、AD Controller 以及 Volume Manager 所调用的一个库,分为 In-Tree 和 Out-of-Tree 两类 Plugins。它通过这些实现来调用远端的存储,比如说挂载一个 NAS 的操作 "mount -t nfs * ",该命令其实就是在 Volume Plugins 中实现的,它会去调用远程的一个存储挂载到本地。
从类型上来看,Volume Plugins 可以分为很多种。In-Tree 中就包含了 几十种常见的存储实现,但一些公司的自己定义私有类型,有自己的 API 和参数,公共存储插件是无法支持的,这时就需要 Out-of-Tree 类的存储实现,比如 CSI、FlexVolume。
Volume Plugins 的具体实现会放到后面去讲。这里主要看一下 Volume Plugins 的插件管理。
Kubernetes会在 PV Controller、AD Controller 以及 Volume Manager 中来做插件管理。通过 VolumePlguinMg 对象进行管理。主要包含 Plugins 和 Prober 两个数据结构。
Plugins 主要是用来保存 Plugins 列表的一个对象,而 Prober 是一个探针,用于发现新的 Plugin,比如 FlexVolume、CSI 是扩展的一种插件,它们是动态创建和生成的,所以一开始我们是无法预知的,因此需要一个探针来发现新的 Plugin。
下图是插件管理的整个过程。
PV Controller、AD Controller 以及 Volume Manager 在启动的时候会执行一个 InitPlugins 方法来对 VolumePluginsMgr 做一些初始化。
它首先会将所有 In-Tree 的 Plugins 加入到我们的插件列表中。同时会调用 Prober 的 init 方法,该方法会首先调用一个 InitWatcher,它会时刻观察着某一个目录 (比如图中的 /usr/libexec/kubernetes/kubelet-plugins/volume/exec/),当这个目录每生成一个新文件的时候,也就是创建了一个新的 Plugins,此时就会生成一个新的 FsNotify.Create 事件,并将其加入到 EventsMap 中;同理,如果删除了一个文件,就生成一个 FsNotify.Remove 事件加入到 EventsMap 中。
当上层调用 refreshProbedPlugins 时,Prober 就会把这些事件进行一个更新,如果是 Create,就将其添加到插件列表;如果是 Remove,就从插件列表中删除一个插件。
以上就是 Volume Plugins 的插件管理机制。
Kubernetes 存储卷调度
我们之前说到 Pod 必须被调度到某个 Worker 上才能去运行。在调度 Pod 时,我们会使用不同的调度器来进行筛选,其中有一些与 Volume 相关的调度器。例如 VolumeZonePredicate、VolumeBindingPredicate、CSIMaxVolumLimitPredicate 等。
VolumeZonePredicate 会检查 PV 中的 Label,比如 failure-domain.beta.kubernetes.io/zone 标签,如果该标签定义了 zone 的信息,VolumeZonePredicate 就会做相应的判断,即必须符合相应的 zone 的节点才能被调度。
比如下图左侧的例子,定义了一个 label 的 zone 为 cn-shenzhen-a。右侧的 PV 则定义了一个 nodeAffinity,其中定义了 PV 所期望的节点的 Label,该 Label 是通过 VolumeBindingPredicate 进行筛选的。
存储卷具体调度信息的实现可以参考《 从零开始入门 K8s | 应用存储和持久化数据卷:存储快照与拓扑调度 》,这里会有一个更加详细的介绍。
二、Flexvolume 介绍及使用
Flexvolume 是 Volume Plugins 的一个扩展,主要实现 Attach/Detach/Mount/Unmount 这些接口。我们知道这些功能本是由 Volume Plugins 实现的,但是对于某些存储类型,我们需要将其扩展到 Volume Plugins 以外,所以我们需要把接口的具体实现放到外面。
在下图中我们可以看到,Volume Plugins 其实包含了一部分 Flexvolume 的实现代码,但这部分代码其实只有一个 “Proxy”的功能。
比如当 AD Controller 调用插件的一个 Attach 时,它首先会调用 Volume Plugins 中 Flexvolume 的 Attach 接口,但这个接口只是把调用转到相应的 Flexvolume 的Out-Of-Tree实现上。
Flexvolume是可被 Kubelet 驱动的可执行文件,每一次调用相当于执行一次 shell 的 ls 这样的脚本,都是可执行文件的命令行调用,因此它不是一个常驻内存的守护进程。
Flexvolume 的 Stdout 作为 Kubelet 调用的返回结果,这个结果需要是 JSON 格式。
Flexvolume默认的存放地址为 "/usr/libexec/kubernetes/kubelet-plugins/volume/exec/alicloud~disk/disk"。
下面是一个命令格式和调用的实例。
Flexvolume 的接口介绍
Flexvolum 包含以下接口: init : 主要做一些初始化的操作,比如部署插件、更新插件的时候做 init 操作,返回的时候会返回刚才我们所说的 DriveCapabilities 类型的数据结构,用来说明我们的 Flexvolume 插件有哪些功能; GetVolumeName : 返回插件名; Attach : 挂载功能的实现。根据 --enable-controller-attach-detach 标签来决定是由 AD Controller 还是 Kubelet 来发起挂载操作; WaitforAttach : Attach 经常是异步操作,因此需要等待挂载完成,才能需要进行下面的操作; MountDevice:它是 mount 的一部分。这里我们将 mount 分为 MountDevice 和 SetUp 两部分,MountDevice 主要做一些简单的预处理工作,比如将设备格式化、挂载到 GlobalMount 目录中等; GetPath :获取每个 Pod 对应的本地挂载目录; Setup :使用 Bind 方式将 GlobalPath 中的设备挂载到 Pod 的本地目录; TearDown 、 UnmountDevice 、 Detach 实现的是上面一些借口的逆过程; ExpandVolumeDevice :扩容存储卷,由 Expand Controller 发起调用; NodeExpand : 扩容文件系统,由 Kubelet 发起调用。

上面这些接口不一定需要全部实现,如果某个接口没有实现的话,可以将返回结果定义成: { "status" : "Not supported" , "message" : "error message" }
告诉调用者没有实现这个接口。此外,Volume Plugins 中的 Flexvolume 接口除了作为一个 Proxy 外,它也提供了一些默认实现,比如 Mount 操作。所以如果你的 Flexvolume 中没有定义该接口,该默认实现就会被调用。
在定义 PV 时可以通过 secretRef 字段来定义一些 secret 的功能。比如挂载时所需的用户名和密码,就可以通过 secretRef 传入。
Flexvolume 的挂载分析
从挂载流程和卸载流程两个方向来分析 Flexvolume 的挂载过程。
我们首先看 Attach 操作,它调用了一个远端的 API 把我们的 Storage 挂载到目标节点中的某个设备上去。然后通过 MountDevice 将本地设备挂载到 GlobalPath 中,同时也会做一些格式化这样的操作。Mount 操作(SetUp),它会把 GlobalPath 挂载 PodPath 中,PodPath 就是 Pod 启动时所映射的一个目录。
下图给出了一个例子,比如我们一个云盘,其 Volume ID 为 d-8vb4fflsonz21h31cmss,在执行完 Attach 和 WaitForAttach 操作之后,就会将其挂载到目标节点上的 /dec/vdc 设备中。执行 MountDevice 之后,就会把上述设备格式化,挂载到一个本地的 GlobalPath 中。而执行完 Mount 之后,就会将 GlobalPath 映射到 Pod 相关的一个子目录中。最后执行 Bind 操作,将我们的本地目录映射到容器中。这样完成一次挂载过程。
卸载流程就是一个逆过程。上述过程描述的是一个块设备的挂载过程,对于文件存储类型,就无需 Attach、MountDevice操作,只需要 Mount 操作,因此文件系统的 Flexvolume 实现较为简单,只需要 Mount 和 Unmount 过程即可。
Flexvolume 的代码示例
其中主要实现的是 init()、doMount()、doUnmount() 方法。在执行该脚本的时候对传入的参数进行判断来决定执行哪一个命令。

在 Github 上还有很多 Flexvolume 的示例,大家可以自行参考查阅。阿里云提供了一个 Flexvolume 的实现 ,有兴趣的可以参考一下。
Flexvolume 的使用
下图给出了一个 Flexvolume 类型的 PV 模板。它和其它模板实际上没有什么区别,只不过类型被定义为 flexVolume 类型。flexVolume 中定义了 driver、fsType、options。 driver 定义的是我们实现的某种驱动,比如图中的是 aliclound/disk,也可以是 aliclound/nas 等; fsType 定义的是文件系统类型,比如 "ext4"; options 包含了一些具体的参数,比如定义云盘的 id 等。
我们也可以像其它类型一样,通过 selector 中的 matchLabels 定义一些筛选条件。同样也可以定义一些相应的调度信息,比如定义 zone 为 cn-shenzhen-a。
下面是一个具体的运行结果。在 Pod 内部我们挂载了一个云盘,其所在本地设备为 /dev/vdb。通过 mount | grep disk 我们可以看到相应的挂载目录,首先它会将 /dev/vdb 挂载到 GlobalPath 中;其次会将 GlobalPath 通过 mount 命令挂载到一个 Pod 所定义的本地子目录中去;最后会把该本地子目录映射到 /data 上。
三、CSI 介绍及使用
和 Flexvolume 类似,CSI 也是为第三方存储提供数据卷实现的抽象接口。
有了 Flexvolume,为何还要 CSI 呢?

Flexvolume 只是给 kubernetes 这一个编排系统来使用的,而 CSI 可以满足不同编排系统的需求,比如 Mesos,Swarm。
其次 CSI 是容器化部署,可以减少环境依赖,增强安全性,丰富插件的功能。我们知道,Flexvolume 是在 host 空间一个二进制文件,执行 Flexvolum 时相当于执行了本地的一个 shell 命令,这使得我们在安装 Flexvolume 的时候需要同时安装某些依赖,而这些依赖可能会对客户的应用产生一些影响。因此在安全性上、环境依赖上,就会有一个不好的影响。
同时对于丰富插件功能这一点,我们在 Kubernetes 生态中实现 operator 的时候,经常会通过 RBAC 这种方式去调用 Kubernetes 的一些接口来实现某些功能,而这些功能必须要在容器内部实现,因此像 Flexvolume 这种环境,由于它是 host 空间中的二进制程序,就没法实现这些功能。而 CSI 这种容器化部署的方式,可以通过 RBAC 的方式来实现这些功能。
CSI 主要包含两个部分:CSI Controller Server 与 CSI Node Server。 Controller Server 是控制端的功能,主要实现创建、删除、挂载、卸载等功能; Node Server 主要实现的是节点上的 mount、Unmount 功能。
下图给出了 CSI 接口通信的描述。CSI Controller Server 和 External CSI SideCar 是通过 Unix Socket 来进行通信的,CSI Node Server 和 Kubelet 也是通过 Unix Socket 来通信,之后我们会讲一下 External CSI SiderCar 的具体概念。
下图给出了 CSI 的接口。主要分为三类:通用管控接口、节点管控接口、中心管控接口。 通用管控接口主要返回 CSI 的一些通用信息,像插件的名字、Driver 的身份信息、插件所提供的能力等; 节点管控接口的 NodeStageVolume 和 NodeUnstageVolume 就相当于 Flexvolume 中的 MountDevice 和 UnmountDevice。NodePublishVolume 和 NodeUnpublishVolume 就相当于 SetUp 和 TearDown 接口; 中心管控接口的 CreateVolume 和 DeleteVolume 就是我们的 Provision 和 Delete 存储卷的一个接口,ControllerPublishVolume 和 ControllerUnPublishVolume 则分别是 Attach 和 Detach 的接口。
CSI 的系统结构
CSI 是通过 CRD 的形式实现的,所以 CSI 引入了这么几个对象类型:VolumeAttachment、CSINode、CSIDriver 以及 CSI Controller Server 与 CSI Node Server 的一个实现。
在 CSI Controller Server 中,有传统的类似 Kubernetes 中的 AD Controller 和 Volume Plugins,VolumeAttachment 对象就是由它们所创建的。
此外,还包含多个 External Plugin组件,每个组件和 CSI Plugin 组合的时候会完成某种功能。比如: External Provisioner 和 Controller Server 组合的时候就会完成数据卷的创建与删除功能; External Attacher 和 Controller Server 组合起来可以执行数据卷的挂载和操作; External Resizer 和 Controller Server 组合起来可以执行数据卷的扩容操作; External Snapshotter 和 Controller Server 组合则可以完成快照的创建和删除。
CSI Node Server 中主要包含 Kubelet 组件,包括 VolumeManager 和 VolumePlugin,它们会去调用 CSI Plugin 去做 mount 和 unmount 操作;另外一个组件 Driver Registrar 主要实现的是 CSI Plugin 注册的功能。
以上就是 CSI 的整个拓扑结构,接下来我们将分别介绍不同的对象和组件。
CSI 对象
我们将介绍 3 种对象:VolumeAttachment,CSIDriver,CSINode。
VolumeAttachment 描述一个 Volume 卷在一个 Pod 使用中挂载、卸载的相关信息。例如,对一个卷在某个节点上的挂载,我们通过 VolumeAttachment 对该挂载进行跟踪。AD Controller 创建一个 VolumeAttachment,而 External-attacher 则通过观察该 VolumeAttachment,根据其状态来进行挂载和卸载操作。
下图就是一个 VolumeAttachment 的例子,其类别 (kind) 为 VolumeAttachment,spec 中指定了 attacher 为 ossplugin.csi.alibabacloud.com,即指定挂载是由谁操作的;指定了 nodeName 为 cn-zhangjiakou.192.168.1.53,即该挂载是发生在哪个节点上的;指定了 source 为 persistentVolumeName 为 oss-csi-pv,即指定了哪一个数据卷进行挂载和卸载。
status 中 attached 指示了挂载的状态,如果是 False, External-attacher 就会执行一个挂载操作。
第二个对象是 CSIDriver,它描述了集群中所部署的 CSI Plugin 列表,需要管理员根据插件类型进行创建。
例如下图中创建了一些 CSI Driver,通过 kuberctl get csidriver 我们可以看到集群里面创建的 3 种类型的 CSI Driver:一个是云盘;一个是 NAS;一个是 OSS。
在 CSI Driver 中,我们定义了它的名字,在 spec 中还定义了 attachRequired 和 podInfoOnMount 两个标签。 attachRequired 定义一个 Plugin 是否支持 Attach 功能,主要是为了对块存储和文件存储做区分。比如文件存储不需要 Attach 操作,因此我们将该标签定义为 False; podInfoOnMount 则是定义 Kubernetes 在调用 Mount 接口时是否带上 Pod 信息。
第三个对象是 CSINode,它是集群中的节点信息,由 node-driver-registrar 在启动时创建。它的作用是每一个新的 CSI Plugin 注册后,都会在 CSINode 列表里添加一个 CSINode 信息。
例如下图,定义了 CSINode 列表,每一个 CSINode 都有一个具体的信息(左侧的 YAML)。以 一 cn-zhangjiakou.192.168.1.49 为例,它包含一个云盘的 CSI Driver,还包含一个 NAS 的 CSI Driver。每个 Driver 都有自己的 nodeID 和它的拓扑信息 topologyKeys。如果没有拓扑信息,可以将 topologyKeys 设置为 "null"。也就是说,假如有一个有 10 个节点的集群,我们可以只定义一部分节点拥有 CSINode。
CSI 组件之 Node-Driver-Registrar
Node-Driver-Registrar 主要实现了 CSI Plugin 注册的一个机制。我们来看一下下图中的流程图。
第 1 步 ,在启动的时候有一个约定,比如说在 /var/lib/kuberlet/plugins_registry 这个目录每新加一个文件,就相当于每新加了一个 Plugin;
启动 Node-Driver-Registrar,它首先会向 CSI-Plugin 发起一个接口调用 GetPluginInfo,这个接口会返回 CSI 所监听的地址以及 CSI-Plugin 的一个 Driver name; 第 2 步 ,Node-Driver-Registrar 会监听 GetInfo 和 NotifyRegistrationStatus 两个接口; 第 3 步 ,会在 /var/lib/kuberlet/plugins_registry 这个目录下启动一个 Socket,生成一个 Socket 文件 ,例如:"diskplugin.csi.alibabacloud.com-reg.sock",此时 Kubelet 通过 Watcher 发现这个 Socket 后,它会通过该 Socket 向 Node-Driver-Registrar 的 GetInfo 接口进行调用。GetInfo 会把刚才我们所获得的的 CSI-Plugin 的信息返回给 Kubelet,该信息包含了 CSI-Plugin 的监听地址以及它的 Driver name; 第 4 步 ,Kubelet 通过得到的监听地址对 CSI-Plugin 的 NodeGetInfo 接口进行调用; 第 5 步 ,调用成功之后,Kubelet 会去更新一些状态信息,比如节点的 Annotations、Labels、status.allocatable 等信息,同时会创建一个 CSINode 对象; 第 6 步 ,通过对 Node-Driver-Registrar 的 NotifyRegistrationStatus 接口的调用告诉它我们已经把 CSI-Plugin 注册成功了。
通过以上 6 步就实现了 CSI Plugin 注册机制。
CSI 组件之 External-Attacher
External-Attacher 主要是通过 CSI Plugin 的接口来实现数据卷的挂载与卸载功能。它通过观察 VolumeAttachment 对象来实现状态的判断。VolumeAttachment 对象则是通过 AD Controller 来调用 Volume Plugin 中的 CSI Attacher 来创建的。CSI Attacher 是一个 In-Tree 类,也就是说这部分是 Kubernetes 完成的。
当 VolumeAttachment 的状态是 False 时,External-Attacher 就去调用底层的一个 Attach 功能;若期望值为 False,就通过底层的 ControllerPublishVolume 接口实现 Detach 功能。同时,External-Attacher 也会同步一些 PV 的信息在里面。
CSI 部署
我们现在来看一下块存储的部署情况。
之前提到 CSI 的 Controller 分为两部分,一个是 Controller Server Pod,一个是 Node Server Pod。
我们只需要部署一个 Controller Server,如果是多备份的,可以部署两个。Controller Server 主要是通过多个外部插件来实现的,比如说一个 Pod 中可以定义多个 External 的 Container 和一个包含 CSI Controller Server 的 Container,这时候不同的 External 组件会和 Controller Server 组成不同的功能。
而 Node Server Pod 是个 DaemonSet,它会在每个节点上进行注册。Kubelet 会直接通过 Socket 的方式直接和 CSI Node Server 进行通信、调用 Attach/Detach/Mount/Unmount 等。
Driver Registrar 只是做一个注册的功能,会在每个节点上进行部署。
文件存储和块存储的部署情况是类似的。只不过它会把 Attacher 去掉,也没有 VolumeAttachment 对象。
CSI 使用示例
和 Flexvolume 一样,我们看一下它的定义模板。
可以看到,它和其它的定义并没什么区别。主要的区别在于类型为 CSI,里面会定义 driver,volumeHandle,volumeAttribute,nodeAffinity 等。 driver 就是定义是由哪一个插件来去实现挂载; volumeHandle 主要是指示 PV 的唯一标签; volumeAttribute 用于附加参数,比如 PV 如果定义的是 OSS,那么就可以在 volumeAttribute 定义 bucket、访问的地址等信息在里面; nodeAffinity 则可以定义一些调度信息。与 Flexvolume 类似,还可以通过 selector 和 Label 定义一些绑定条件。
中间的图给出了一个动态调度的例子,它和其它类型的动态调度是一样的。只不过在定义 provisioner 的时候指定了一个 CSI 的 provisioner。
下面给出了一个具体的挂载例子。
Pod 启动之后,我们可以看到 Pod 已经把一个 /dev/vdb 挂载到 /data 上了。同理,它有一个 GlobalPath 和一个 PodPath 的集群在里面。我们可以把一个 /dev/vdb 挂载到一个 GlobalPath 里面,它就是一个 CSI 的一个 PV 在本节点上唯一确定的目录。一个 PodPath 就是一个 Pod 所确定的一个本地节点的目录,它会把 Pod 所对应的目录映射到我们的容器中去。
CSI 的其它功能
除了挂载、卸载之外,CSI 化提供了一些附加的功能。例如,在定义模板的时候往往需要一些用户名和密码信息,此时我们就可通过 Secret 来进行定义。之前我们所讲的 Flexvolume 也支持这个功能,只不过 CSI 可以根据不同的阶段定义不同的 Secret 类型,比如挂载阶段的 Secret、Mount 阶段的 Secret、Provision 阶段的 Secret。
Topology 是一个拓扑感知的功能。当我们定义一个数据卷的时候,集群中并不是所有节点都能满足该数据卷的需求,比如我们需要挂载不同的 zone 的信息在里面,这就是一个拓扑感知的功能。这部分在第 10 讲已有详细的介绍,大家可以进行参考。
Block Volume 就是 volumeMode 的一个定义,它可以定义成 Block 类型,也可以定义成文件系统类型,CSI 支持 Block 类型的 Volume,就是说挂载到 Pod 内部时,它是一个块设备,而不是一个目录。
Skip Attach 和 PodInfo On Mount 是刚才我们所讲过的 CSI Driver 中的两个功能。
CSI 的近期 Features
CSI 还是一个比较新的实现方式。近期也有了很多更新,比如 ExpandCSIVolumes 可以实现文件系统扩容的功能;VolumeSnapshotDataSource 可以实现数据卷的快照功能;VolumePVCDataSource 实现的是可以定义 PVC 的数据源;我们以前在使用 CSI 的时候只能通过 PVC、PV 的方式定义,而不能直接在 Pod 里面定义 Volume,CSIInlineVolume 则可以让我们可以直接在 Volume 中定义一些 CSI 的驱动。
阿里云在 GitHub 上开源了 CSI 的实现 ,大家有兴趣的可以看一下,做一些参考。
四、本文总结
本文主要介绍了 Kubernetes 集群中存储卷相关的知识,主要有以下三点内容: 第一部分讲述了 Kubernetes 存储架构,主要包括存储卷概念、挂载流程、系统组件等相关知识; 第二部分讲述了 Flexvolume 插件的实现原理、部署架构、使用示例等; 第三部分讲述了 CSI 插件的实现原理、资源对象、功能组件、使用示例等;
希望上述知识点能让各位同学有所收获,特别是在处理存储卷相关的设计、开发、故障处理等方面有所帮助。

查看更多:https://yq.aliyun.com/articles/743613?utm_content=g_1000103097

上云就看云栖号:更多云资讯,上云案例,最佳实践,产品入门,访问:https://yqh.aliyun.com/
大数据
2020-02-10 15:36:00
# default_exp digdata # 上面一行用于nbdev中声明本模块的名称。必须是notebook的第一个Cell的第一行。
digdata 描述:抗击新冠病毒(3)-探索在线数据资源 功能:本页面用于交互式地探索数据。通过访问网络获取数据,分析和理解网页数据结构,转换为列表格式,用于后续的分析和绘图输出。 模块:使用JupyterLab、Python、nbdev等完成。用到的Python模块包括: requests,访问web服务网站。 re,正则表达式解析。 json,JSON格式解析。 BeautifulSoup,HTML格式解析。 pprint,格式化输出。 pandas,数据表格分析。 源码-https://github.com/openthings/anti2020ncov 参考: JupyterLab-数据实验室 文学式编程-nbdev入门教程 抗击新冠病毒(1)-开源软件与数据项目 抗击新冠病毒(2)-基于Jupyter+nbdev的数据分析 #hide from nbdev.showdoc import * #export from bs4 import BeautifulSoup from parser import * #regex_parser import re import json import time import logging import datetime import requests import pprint
获取网页数据 #export #url = "https://3g.dxy.cn/newh5/view/pneumonia" url = "https://ncov.dxy.cn/ncovh5/view/pneumonia?from=singlemessage&isappinstalled=0" headers = { 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.117 Safari/537.36' } #export session = requests.session() session.headers.update(headers) r = session.get(url) #export #pprint.pprint(r.text) #export #soup = BeautifulSoup(r.content, 'lxml') #soup
提取特定的数据域 # export # 分为总体情况、分省情况、省内各市情况、新闻四大类。 overall_information = re.search(r'\{("id".*?)\}', str(soup.find('script', attrs={'id': 'getStatisticsService'}))) province_information = re.search(r'\[(.*?)\]', str(soup.find('script', attrs={'id': 'getListByCountryTypeService1'}))) area_information = re.search(r'\[(.*)\]', str(soup.find('script', attrs={'id': 'getAreaStat'}))) news_information = re.search(r'\[(.*?)\]', str(soup.find('script', attrs={'id': 'getTimelineService'})))
1、总体情况 #pprint.pprint(overall_information.string) #overall_information.group(0) #jsall = json.loads(overall_information.group(0))
def overall_parser(overall_information): overall_information = json.loads(overall_information.group(0)) overall_information.pop('id') overall_information.pop('createTime') overall_information.pop('modifyTime') overall_information.pop('imgUrl') overall_information.pop('deleted') overall_information['countRemark'] = overall_information['countRemark'].replace(' 疑似', ',疑似').replace(' 治愈', ',治愈').replace(' 死亡', ',死亡').replace(' ', '') #overall_information = json.loads(overall_information.group(0))
2、分省情况 #provinces = json.loads(province_information.group(0)) #provinces
def province_parser(province_information): provinces = json.loads(province_information.group(0)) crawl_timestamp = "" for province in provinces: province.pop('id') province['comment'] = province['comment'].replace(' ', '') province['crawlTime'] = crawl_timestamp #province['country'] = country_type.get(province['countryType']) province['tags'] = province['tags'].replace(' ', '') province = regex_parser(content=province, key='tags') #for province in provinces: # print(province['id'],'\t',province['provinceShortName'],'\t',province['tags'])
3、省内各市县情况 #area_information.string area = json.loads(area_information.group(0)) print("省份\t确诊\t疑似\t治愈\t死亡") for a in area: print(a['provinceName'],'\t',a['confirmedCount'],'\t',a['suspectedCount'],'\t',a['curedCount'],'\t',a['deadCount'])
按省提取城市情况 cities = area[0]['cities'] #cities print("城市\t确诊\t疑似\t治愈\t死亡") for p in area: cities = p['cities'] print("===================================") print(p['provinceName'],'\t',p['confirmedCount'],'\t',p['suspectedCount'],'\t',p['curedCount'],'\t',p['deadCount']) print("-----------------------------------") for c in cities: print(c['cityName'],'\t',c['confirmedCount'],'\t',c['suspectedCount'],'\t',c['curedCount'],'\t',c['deadCount'])
4、新闻列表 news = json.loads(news_information.group(0)) #news for n in news: print(n['id'],'\t',n['infoSource'].strip(),'\t',n['title'].strip())#,n['summary'].strip())
nbdev 适用工具 # 将notebook转化为python的*.py代码,保存到项目名称的子目录中。 from nbdev.export import * notebook2script() Converted 00_digdata.ipynb. Converted 01_getdata.ipynb. Converted 10_charts.ipynb. Converted 10_china.ipynb. Converted index.ipynb. help(notebook2script) Help on function notebook2script in module nbdev.export: notebook2script(fname=None, silent=False, to_dict=False) Convert notebooks matching `fname` to modules
大数据
2020-02-10 12:39:00
直播主题:
智能测温及社区防疫监控解决方案
直播时间:
2月10日 10:00-10:30
讲师:
岑参,阿里云智能IoT解决方案架构师七年医疗行业数字化咨询经验,曾任职于国内头部IT公司,现负责IOT医疗行业
适合观众:
政务大厅管理人员、车站交通枢纽管理人员、医院管理人员、一般居民用户
内容简介:
热成像人体测温方案适用于人群聚集区域检测疫情防控,利用红外非接触式体温检测,可实现快速体温筛查,远距离、大面积检测,自动预警;智能体温远程监控方案可部署于医院新开设的隔离病区或居民家庭,能很大程度减少医患间因测量基础生命体征而发生的接触,提高效率降低被感染风险。

直播主题:
金融行业钉钉组织健康守护方案
直播时间:
2月10日 14:00-14:50
讲师:
七玉,钉钉金融行业运营专家十年企业数字化咨询、运营经验,曾任职于国际、国内头部IT公司,现负责钉钉金融行业
适合观众:
金融行业(银行、保险、证券等)IT负责人、人事经理、办公室主任等
内容简介:
应对疫情期间,各组织启用远程办公的需求,阿里巴巴钉钉紧急推出组织健康方案,免费推出员工健康打卡服务以及异地办公工具,帮助金融机构守护组织成员健康的同时实现数智化远程协同。本次课程分为两期,第一期:精准通知、智能守护。包括钉钉健康打卡、紧急通知、视频会议三个核心场景。

在线看大会,就来云栖号! 每天都有行业专家分享!请访问: https://yqh.aliyun.com/zhibo
大数据
2020-02-10 10:33:00
一、安装准备
本次安装的版本是截止2020.1.30最新的版本0.17.0
软件要求 需要**Java 8(8u92 +)**以上的版本,否则会有问题 Linux,Mac OS X或其他类似Unix的操作系统(不支持Windows)
硬件要求
Druid包括一组参考配置和用于单机部署的启动脚本: nano-quickstart micro-quickstart small medium large xlarge
单服务器参考配置
Nano-Quickstart:1个CPU,4GB RAM 启动命令: bin/start-nano-quickstart 配置目录: conf/druid/single-server/nano-quickstart
微型快速入门:4个CPU,16GB RAM 启动命令: bin/start-micro-quickstart 配置目录: conf/druid/single-server/micro-quickstart
小型:8 CPU,64GB RAM(〜i3.2xlarge) 启动命令: bin/start-small 配置目录: conf/druid/single-server/small
中:16 CPU,128GB RAM(〜i3.4xlarge) 启动命令: bin/start-medium 配置目录: conf/druid/single-server/medium
大型:32 CPU,256GB RAM(〜i3.8xlarge) 启动命令: bin/start-large 配置目录: conf/druid/single-server/large
大型X:64 CPU,512GB RAM(〜i3.16xlarge) 启动命令: bin/start-xlarge 配置目录: conf/druid/single-server/xlarge
我们这里做测试使用选择最低配置即可 nano-quickstart
二、下载安装包
访问官网:
http://druid.io/现在也会跳转https://druid.apache.org/
或者直接访问 https://druid.apache.org/
点击download进入下载页面:
选择最新版本: apache-druid-0.17.0-bin.tar.gz 进行下载
200多M
也可以选择下载源码包 用maven进行编译
三、安装
上传安装包
在终端中运行以下命令来安装Druid: tar -xzf apache-druid-0.17.0-bin.tar.gz cd apache-druid-0.17.0
安装包里有这几个目录:
LICENSE 和 NOTICE 文件 bin/* -脚本 conf/* -单服务器和集群设置的示例配置 extensions/* -扩展 hadoop-dependencies/* -Druid Hadoop依赖 lib/* -Druid库 quickstart/* -快速入门教程的配置文件,样本数据和其他文件
配置文件 #进入我们要启动的配置文件位置: cd conf/druid/single-server/nano-quickstart/
_common 公共配置
是druid一些基本的配置,比如元数据库地址 各种路径等等
其他的是各个节点的配置
比较类似,比如broker cd broker/
jvm配置
main配置
runtime运行时相关的配置
回到主目录
启动的conf在 cd conf/supervise/single-server
里面是不同配置启动不同的脚本
四、启动
回到主目录 ./bin/start-nano-quickstart
启动成功:
访问
localhost:8888
看到管理页面
如果要修改端口,需要修改配置的端口和主目录下的 vi bin/verify-default-ports
五、加载数据
Druid提供了一个示例数据文件,其中包含2015年9月12日发生的Wiki的示例数据。
此样本数据位于 quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz
示例数据大概是这样: { "timestamp":"2015-09-12T20:03:45.018Z", "channel":"#en.wikipedia", "namespace":"Main", "page":"Spider-Man's powers and equipment", "user":"foobar", "comment":"/* Artificial web-shooters */", "cityName":"New York", "regionName":"New York", "regionIsoCode":"NY", "countryName":"United States", "countryIsoCode":"US", "isAnonymous":false, "isNew":false, "isMinor":false, "isRobot":false, "isUnpatrolled":false, "added":99, "delta":99, "deleted":0, }
Druid加载数据分为以下几种: 加载文件 从kafka中加载数据 从hadoop中加载数据 自定义加载方式
我们这样演示一下加载示例文件数据
1、进入localhost:8888 点击load data
2、选择local disk
3、选择Connect data
4、预览数据
Base directory输入quickstart/tutorial/
File filter输入 wikiticker-2015-09-12-sampled.json.gz
然后点击apply预览 就可以看见数据了 点击Next:parse data解析数据
5、解析数据
可以看到json数据已经被解析了 继续解析时间
6、解析时间
解析时间成功 之后两步是transform和filter 这里不做演示了 直接next
7、确认Schema
这一步会让我们确认Schema 可以做一些修改
由于数据量较小 我们直接关掉Rollup 直接下一步
8、设置分段
这里可以设置数据分段 我们选择hour next
9、确认发布

10、发布成功 开始解析数据
等待任务成功
11、查看数据
选择datasources 可以看到我们加载的数据
可以看到数据源名称 Fully是完全可用 还有大小等各种信息
12、查询数据
点击query按钮
我们可以写sql查询数据了 还可以将数据下载
Druid相关博文
什么是Druid
静下心来,努力的提升自己,永远都没有错。更多实时计算相关博文,欢迎关注实时流式计算
大数据
2020-02-10 09:06:00
Apache Flink社区宣布Flink 1.10.0正式发布!
本次Release版本修复1.2K个问题,对Flink作业的整体性能和稳定性做了重大改进,同时增加了对K8S,Python的支持。
这个版本标志着与Blink集成的完成,并且强化了流式SQL与Hive的集成,本文将详细介绍新功能和主要的改进。
一、内存管理优化
原有TaskExecutor 有一些缺点:
流处理和批处理用了不同的配置模型;
流处理的堆外配置RocksDB复杂,需要用户配置;
为了使内存管理更明确直观,Flink 1.10对TaskExecutor内存模型和配置做了重大改进,这个更改使FLink更适合于各种部署环境:K8S,Yarn,Mesos。
这种更改统一了入口点,使得下游框架比如zeppelin的编程更加容易。
二、集成Kubernetes
这对于想要在容器中使用Flink的用户是一个非常好的消息。
在Flink1.10中推出了 Active Kubernetes集成
Flink的ResourceManager( K8sResMngr )与Kubernetes进行本地通信以按需分配新的Pod,类似于Flink的Yarn和Mesos集成。用户还可以利用命名空间为聚合资源消耗有限的多租户环境启动Flink集群。事先配置具有足够权限的RBAC角色和服务帐户。
用户可以简单地参考Kubernetes配置选项,然后使用以下命令在CLI中将作业提交到Kubernetes上的现有Flink会话: ./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id= examples/streaming/WindowJoin.jar
三、集成Hive
Flink 1.10通过开发将Hive集成到Flink,可用于生产环境。
并且支持大部分Hive版本,Flink支持Hive版本列表: 1.0 1.0.0 1.0.1 1.1 1.1.0 1.1.1 1.2 1.2.0 1.2.1 1.2.2 2.0 2.0.0 2.0.1 2.1 2.1.0 2.1.1 2.2 2.2.0 2.3 2.3.0 2.3.1 2.3.2 2.3.3 2.3.4 2.3.5 2.3.6 3.1 3.1.0 3.1.1 3.1.2
需要引入依赖 org.apache.flink flink-connector-hive_2.11 1.10.0 provided org.apache.flink flink-table-api-java-bridge_2.11 1.10.0 provided org.apache.hive hive-exec ${hive.version} provided
连接Hive代码 val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build() val tableEnv = TableEnvironment.create(settings) val name = "myhive" val defaultDatabase = "mydatabase" val hiveConfDir = "/opt/hive-conf" // a local path val version = "2.3.4" val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version) tableEnv.registerCatalog("myhive", hive) // set the HiveCatalog as the current catalog of the session tableEnv.useCatalog("myhive")
四、PyFlink:支持UDF
从Flink 1.10开始,PyFlink开始支持UDF函数。
用户还可以 pip 使用以下方法轻松安装PyFlink : pip install apache-flink

五、其他重要变化 Flink现在可以编译并在Java 11上运行。 一个新的Elasticsearch sink,完全支持Elasticsearch 7.x版本。 Kafka 0.8 和 0.9 版本已经被废,不再支持。 删除了非认证网络流量配置选项taskmanager.network.credit.model。 删除了旧版Web UI。
六、贡献者名单
最后我们看一下贡献者的名单,有很多国内大神的身影
Achyuth Samudrala, Aitozi, Alberto Romero, Alec.Ch, Aleksey Pak, Alexander Fedulov, Alice Yan, Aljoscha Krettek, Aloys, Andrey Zagrebin, Arvid Heise, Benchao Li, Benoit Hanotte, Benoît Paris, Bhagavan Das, Biao Liu, Chesnay Schepler, Congxian Qiu, Cyrille Chépélov, César Soto Valero, David Anderson, David Hrbacek, David Moravek, Dawid Wysakowicz, Dezhi Cai, Dian Fu, Dyana Rose, Eamon Taaffe, Fabian Hueske, Fawad Halim, Fokko Driesprong, Frey Gao, Gabor Gevay, Gao Yun, Gary Yao, GatsbyNewton, GitHub, Grebennikov Roman, GuoWei Ma, Gyula Fora, Haibo Sun, Hao Dang, Henvealf, Hongtao Zhang, HuangXingBo, Hwanju Kim, Igal Shilman, Jacob Sevart, Jark Wu, Jeff Martin, Jeff Yang, Jeff Zhang, Jiangjie (Becket) Qin, Jiayi, Jiayi Liao, Jincheng Sun, Jing Zhang, Jingsong Lee, JingsongLi, Joao Boto, John Lonergan, Kaibo Zhou, Konstantin Knauf, Kostas Kloudas, Kurt Young, Leonard Xu, Ling Wang, Lining Jing, Liupengcheng, LouisXu, Mads Chr. Olesen, Marco Zühlke, Marcos Klein, Matyas Orhidi, Maximilian Bode, Maximilian Michels, Nick Pavlakis, Nico Kruber, Nicolas Deslandes, Pablo Valtuille, Paul Lam, Paul Lin, PengFei Li, Piotr Nowojski, Piotr Przybylski, Piyush Narang, Ricco Chen, Richard Deurwaarder, Robert Metzger, Roman, Roman Grebennikov, Roman Khachatryan, Rong Rong, Rui Li, Ryan Tao, Scott Kidder, Seth Wiesman, Shannon Carey, Shaobin.Ou, Shuo Cheng, Stefan Richter, Stephan Ewen, Steve OU, Steven Wu, Terry Wang, Thesharing, Thomas Weise, Till Rohrmann, Timo Walther, Tony Wei, TsReaper, Tzu-Li (Gordon) Tai, Victor Wong, WangHengwei, Wei Zhong, WeiZhong94, Wind (Jiayi Liao), Xintong Song, XuQianJin-Stars, Xuefu Zhang, Xupingyong, Yadong Xie, Yang Wang, Yangze Guo, Yikun Jiang, Ying, YngwieWang, Yu Li, Yuan Mei, Yun Gao, Yun Tang, Zhanchun Zhang, Zhenghua Gao, Zhijiang, Zhu Zhu, a-suiniaev, azagrebin, beyond1920, biao.liub, blueszheng, bowen.li, caoyingjie, catkint, chendonglin, chenqi, chunpinghe, cyq89051127, danrtsey.wy, dengziming, dianfu, eskabetxe, fanrui, forideal, gentlewang, godfrey he, godfreyhe, haodang, hehuiyuan, hequn8128, hpeter, huangxingbo, huzheng, ifndef-SleePy, jiemotongxue, joe, jrthe42, kevin.cyj, klion26, lamber-ken, libenchao, liketic, lincoln-lil, lining, liuyongvs, liyafan82, lz, mans2singh, mojo, openinx, ouyangwulin, shining-huang, shuai-xu, shuo.cs, stayhsfLee, sunhaibotb, sunjincheng121, tianboxiu, tianchen, tianchen92, tison, tszkitlo40, unknown, vinoyang, vthinkxie, wangpeibin, wangxiaowei, wangxiyuan, wangxlong, wangyang0918, whlwanghailong, xuchao0903, xuyang1706, yanghua, yangjf2019, yongqiang chai, yuzhao.cyz, zentol, zhangzhanchum, zhengcanbin, zhijiang, zhongyong jin, zhuzhu.zz, zjuwangg, zoudaokoulife, 砚田, 谢磊, 张志豪, 曹建华
Flink系列文章:
Flink入门(一)——Apache Flink介绍 Flink入门(二)——Flink架构介绍
Flink入门(三)——环境与部署
Flink入门(四)——编程模型
更多实时计算,Flink,Kafka等相关技术博文,欢迎关注实时流式计算
大数据
2020-02-13 09:39:00
2020年到了,祝大家新年快乐!
2020年是一个闰年(Leap Year),闰年是会出故障的。八年前,2012年2月29日,我在Azure的时候我们就出了一个大故障:
https://azure.microsoft.com/en-us/blog/summary-of-windows-azure-service-disruption-on-feb-29th-2012/
常见的错误认知
1、 一年总是365天
2、2月总是28天
3、闰年是每四年一次
其实,闰年并不是每四年一次。2000是闰年,但1900年和2100都不是闰年。
哪里容易出闰年相关的Bug
1、在一个日期值上加或减时间的代码。尤其是加减1年或1个月的代码
2、各种根据数据库查询结果生成的报表和图标,月度和年度统计可能会少算1天
3、证书/密码/密钥/缓存 等的过期时间,可能会比预期的早了一天,或者可能设定了一个非法的过期时间
4、固定长度的数组。例如,一个长度为365的数组遇到闰年可能就不够了,可能会数组越界。
5、UI组件,例如日历、日期选择组件,以及客户端输入校验相关的代码。
闰年的哪些日子要特别注意
2019年12月31日:这是闰年前一年的最后一天。2019年的最后一天加365天,并不是2020年的最后一天,而会是2020年的倒数第二天(即2020年12月30日)。
2020年1月1日:闰年的第一天。闰年的第一天加365天,并不是下一年的1月1日,而是今年的12月31日。
2020年1月31日:这一天加28天,并不是下个月(2月)的最后一天。
2020年2月1日:这一天加28天,并不是下个月(3月)的第一天。
2020年2月28日:这是2月29日的前一天。有问题的代码可能会错误的把这天当成2月的最后一天,试图加1天得到3月1日。但实际上这一天加1天是2月29日。
2020年2月29日:这是闰年多出来的一天。如果代码以为2月总是只有28天,那代码可能出现各种问题,例如:
入参校验会认为一个合法输入(2020/2/29)是非法的,用 { year+1 , month , day } 的方式来加减1年的话会产生一个非法日期。
2020年3月1日:2月29日后面的那天。代码如果在3月1日上减28天,会得到2月2日(而不是预期中的2月1日);减365天的话会得到2019年3月2日(而不是预期中的3月1日)。
2020年12月31日:一年的第366天。
代码如果不能正确处理一年的第366天,可能也会导致问题。例如,2008年12月31日,第三方软件中的问题导致了所有Microsoft Zune设备无法使用,详情参考:
http://www.theguardian.com/technology/blog/2009/jan/01/zune-firmware-mistake
代码如果假设1年永远是365天,声明了一个固定大小为365的数组,那在一年的第366天可能会发生数组越界。
数组越界如果发生在 C/C++ 语言编写的代码里,可能导致内存溢出攻击漏洞。

查看更多:https://yq.aliyun.com/articles/742802?utm_content=g_1000103478
上云就看云栖号:更多云资讯,上云案例,最佳实践,产品入门,访问:https://yqh.aliyun.com/
大数据
2020-02-12 15:13:00
2020.01.02 新年伊始,Nacos Star 突破 10000,从此迈上了一个新的里程碑。感谢大家的一路支持、信任和帮助!!!
Nacos 开源 17 个月以来,发布了 22 个版本,成功切入 Dubbo/Spring-Cloud/ 云原生三个核心生态。吸引了 88 位优秀贡献者,积累了 110 家企业案例,官网累计获取 20w+ 用户浏览, 2000 UV ,借此机会,我们代表 Nacos 社区一起回顾 Nacos 来时的路,和未来的发展方向。
项目起源
Nacos 在阿里巴巴起源于 2008 年五彩石项目(完成微服务拆分和业务中台建设),成长于十年双十一的洪峰考验,沉淀了简单易用、稳定可靠、性能卓越的核心竞争力。随着云计算兴起,2018 年我们深刻感受到开源软件行业的影响,因此决定将 Nacos(阿里内部 Configserver/Diamond/Vipserver 内核) 开源,输出阿里十年的沉淀,推动微服务行业发展,加速企业数字化转型!
开源后的工作
开源很重要的是生态,而且开发者往往是先选服务框架,再选注册中心和配置中心,因此在 1.0 之前 Nacos 首先支持了国内人气最高的 Dubbo/Spring-Cloud 两个主流服务框架,又在 1.X 版本之后支持了云原生的服务框架。至此 Nacos 目前已经能够支持所有主流服务框架,并且为用户未来平滑迁移云原生服务框架做好了准备!
虽然我们期望通过云原生的方式支持多语言,但是也为 Java/Golang/NodeJs/Cpp/Python 等提供了语言级支持,以便给大家更好的编程体验!
后续规划
2020 年,我们将聚焦 Nacos 内核构建,打造一个更稳定、更安全、更高效的微服务引擎!
目前最核心的工作如下: 建立访问控制体系,提升安全水准 升级连接通道,提升推送效率 解耦Mysql,降低部署运维成本


查看更多:https://yq.aliyun.com/articles/742637?utm_content=g_1000103477
上云就看云栖号:更多云资讯,上云案例,最佳实践,产品入门,访问:https://yqh.aliyun.com/
大数据
2020-02-12 15:10:00
消息队列常见面试问题小集合

一、为什么使用消息队列?消息队列有什么优点和缺点?Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什么区别,以及适合哪些场景?
面试官心理分析
其实面试官主要是想看看:
第一 ,你知不知道你们系统里为什么要用消息队列这个东西?
不少候选人,说自己项目里用了 Redis、MQ,但是其实他并不知道自己为什么要用这个东西。其实说白了,就是为了用而用,或者是别人设计的架构,他从头到尾都没思考过。
没有对自己的架构问过为什么的人,一定是平时没有思考的人,面试官对这类候选人印象通常很不好。因为面试官担心你进了团队之后只会木头木脑的干呆活儿,不会自己思考。
第二 ,你既然用了消息队列这个东西,你知不知道用了有什么好处&坏处?
你要是没考虑过这个,那你盲目弄个 MQ 进系统里,后面出了问题你是不是就自己溜了给公司留坑?你要是没考虑过引入一个技术可能存在的弊端和风险,面试官把这类候选人招进来了,基本可能就是挖坑型选手。就怕你干 1 年挖一堆坑,自己跳槽了,给公司留下无穷后患。
第三 ,既然你用了 MQ,可能是某一种 MQ,那么你当时做没做过调研?
你别傻乎乎的自己拍脑袋看个人喜好就瞎用了一个 MQ,比如 Kafka,甚至都从没调研过业界流行的 MQ 到底有哪几种。每一个 MQ 的优点和缺点是什么。每一个 MQ 没有绝对的好坏 ,但是就是看用在哪个场景可以 扬长避短,利用其优势,规避其劣势 。
如果是一个不考虑技术选型的候选人招进了团队,leader 交给他一个任务,去设计个什么系统,他在里面用一些技术,可能都没考虑过选型,最后选的技术可能并不一定合适,一样是留坑。
面试题剖析
为什么使用消息队列
其实就是问问你消息队列都有哪些使用场景,然后你项目里具体是什么场景,说说你在这个场景里用消息队列是什么?
面试官问你这个问题, 期望的一个回答 是说,你们公司有个什么 业务场景 ,这个业务场景有个什么技术挑战,如果不用 MQ 可能会很麻烦,但是你现在用了 MQ 之后带给了你很多的好处。
先说一下消息队列常见的使用场景吧,其实场景有很多,但是比较核心的有 3 个: 解耦 、 异步 、 削峰 。
解耦
看这么个场景。A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃......
在这个场景中,A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。A 系统要时时刻刻考虑 BCDE 四个系统如果挂了该咋办?要不要重发,要不要把消息存起来?头发都白了啊!
如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。
总结 :通过一个 MQ,Pub/Sub 发布订阅消息这么一个模型,A 系统就跟其它系统彻底解耦了。
面试技巧 :你需要去考虑一下你负责的系统中是否有类似的场景,就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦,也是可以的,你就需要去考虑在你的项目里,是不是可以运用这个 MQ 去进行系统的解耦。在简历中体现出来这块东西,用 MQ 作解耦。
异步
再来看一个场景,A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求,等待个 1s,这几乎是不可接受的。
一般互联网类的企业,对于用户直接的操作,一般要求是每个请求都必须在 200 ms 以内完成,对用户几乎是无感知的。
如果 使用 MQ ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms,对于用户而言,其实感觉上就是点个按钮,8ms 以后就直接返回了,爽!网站做得真好,真快!
削峰
每天 0:00 到 12:00,A 系统风平浪静,每秒并发请求数量就 50 个。结果每次一到 12:00 ~ 13:00 ,每秒并发请求数量突然会暴增到 5k+ 条。但是系统是直接基于 MySQL的,大量的请求涌入 MySQL,每秒钟对 MySQL 执行约 5k 条 SQL。
一般的 MySQL,扛到每秒 2k 个请求就差不多了,如果每秒请求到 5k 的话,可能就直接把 MySQL 给打死了,导致系统崩溃,用户也就没法再使用系统了。
但是高峰期一过,到了下午的时候,就成了低峰期,可能也就 1w 的用户同时在网站上操作,每秒中的请求数量可能也就 50 个请求,对整个系统几乎没有任何的压力。
如果使用 MQ,每秒 5k 个请求写入 MQ,A 系统每秒钟最多处理 2k 个请求,因为 MySQL 每秒钟最多处理 2k 个。A 系统从 MQ 中慢慢拉取请求,每秒钟就拉取 2k 个请求,不要超过自己每秒能处理的最大请求数量就 ok,这样下来,哪怕是高峰期的时候,A 系统也绝对不会挂掉。而 MQ 每秒钟 5k 个请求进来,就 2k 个请求出去,结果就导致在中午高峰期(1 个小时),可能有几十万甚至几百万的请求积压在 MQ 中。
这个短暂的高峰期积压是 ok 的,因为高峰期过了之后,每秒钟就 50 个请求进 MQ,但是 A 系统依然会按照每秒 2k 个请求的速度在处理。所以说,只要高峰期一过,A 系统就会快速将积压的消息给解决掉。
消息队列有什么优缺点
优点上面已经说了,就是 在特殊场景下有其对应的好处 , 解耦 、 异步 、 削峰 。
缺点有以下几个:
1.系统可用性降低
系统引入的外部依赖越多,越容易挂掉。本来你就是 A 系统调用 BCD 三个系统的接口就好了,人 ABCD 四个系统好好的,没啥问题,你偏加个 MQ 进来,万一 MQ 挂了咋整,MQ 一挂,整套系统崩溃的,你不就完了?
2.系统复杂度提高
硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?头大头大,问题一大堆,痛苦不已。
3.一致性问题
A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。
所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关键时刻,用,还是得用的。
Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?
一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;综上,各种对比之后,有如下建议:
后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;
不过现在确实越来越多的公司,会去用 RocketMQ,确实很不错(阿里出品),但社区可能有突然黄掉的风险,对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。
所以 中小型公司 ,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择; 大型公司 ,基础架构研发实力较强,用 RocketMQ 是很好的选择。
如果是 大数据领域 的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

二、如何保证消息队列的高可用?
面试官心理分析
如果有人问到你 MQ 的知识,高可用是必问的。 上一讲提到,MQ 会导致系统可用性降低。 所以只要你用了 MQ,接下来问的一些要点肯定就是围绕着 MQ 的那些缺点怎么来解决了。
要是你傻乎乎的就干用了一个 MQ,各种问题从来没考虑过,那你就杯具了,面试官对你的感觉就是,只会简单使用一些技术,没任何思考,马上对你的印象就不太好了。 这样的同学招进来要是做个 20k 薪资以内的普通小弟还凑合,要是做薪资 20k+ 的高工,那就惨了,让你设计个系统,里面肯定一堆坑,出了事故公司受损失,团队一起背锅。
面试题剖析
这个问题这么问是很好的,因为不能问你 Kafka 的高可用性怎么保证? ActiveMQ 的高可用性怎么保证? 一个面试官要是这么问就显得很没水平,人家可能用的就是 RabbitMQ,没用过 Kafka,你上来问人家 Kafka 干什么? 这不是摆明了刁难人么。
所以有水平的面试官,问的是 MQ 的高可用性怎么保证? 这样就是你用过哪个 MQ,你就说说你对那个 MQ 的高可用性的理解。
RabbitMQ 的高可用性
RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。
RabbitMQ 有三种模式: 单机模式、普通集群模式、镜像集群模式。
单机模式
单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的😄,没人生产用单机模式。
普通集群模式(无高可用性)
普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。 你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。 你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。

这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。 因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。
而且如果那个放 queue 的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。
所以这个事儿就比较尴尬了,这就没有什么所谓的高可用性,这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。
镜像集群模式(高可用性)
这种模式,才是所谓的 RabbitMQ 的高可用模式。 跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。 然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。


那么如何开启这个镜像集群模式呢? 其实很简单,RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。
这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。 坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重! 第二,这么玩儿,不是分布式的,就没有扩展性可言了,如果某个 queue 负载很重,你加机器,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展你的 queue。 你想,如果这个 queue 的数据量很大,大到这个机器上的容量无法容纳了,此时该怎么办呢?
Kafka 的高可用性
Kafka 一个最基本的架构认识: 由多个 broker 组成,每个 broker 是一个节点; 你创建一个 topic,这个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 就放一部分数据。
这就是天然的分布式消息队列,就是说一个 topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据。
实际上 RabbmitMQ 之类的,并不是分布式消息队列,它就是传统的消息队列,只不过提供了一些集群、HA(High Availability, 高可用性) 的机制而已,因为无论怎么玩儿,RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据。
Kafka 0.8 以前,是没有 HA 机制的,就是任何一个 broker 宕机了,那个 broker 上的 partition 就废了,没法写也没法读,没有什么高可用性可言。
比如说,我们假设创建了一个 topic,指定其 partition 数量是 3 个,分别在三台机器上。 但是,如果第二台机器宕机了,会导致这个 topic 的 1/3 的数据就丢了,因此这个是做不到高可用的。
Kafka 0.8 以后,提供了 HA 机制,就是 replica(复制品) 副本机制。 每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。 所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。 写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上的数据即可。 只能读写 leader? 很简单,要是你可以随意读写每个 follower,那么就要 care 数据一致性的问题,系统复杂度太高,很容易出问题。 Kafka 会均匀地将一个 partition 的所有 replica 分布在不同的机器上,这样才可以提高容错性。

这么搞,就有所谓的高可用性了,因为如果某个 broker 宕机了,没事儿,那个 broker上面的 partition 在其他机器上都有副本的。 如果这个宕机的 broker 上面有某个 partition 的 leader,那么此时会从 follower 中重新选举一个新的 leader 出来,大家继续读写那个新的 leader 即可。 这就有所谓的高可用性了。
写数据的时候,生产者就写 leader,然后 leader 将数据落地写本地磁盘,接着其他 follower 自己主动从 leader 来 pull 数据。 一旦所有 follower 同步好数据了,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功的消息给生产者。 (当然,这只是其中一种模式,还可以适当调整这个行为)
消费的时候,只会从 leader 去读,但是只有当一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到。
看到这里,相信你大致明白了 Kafka 是如何保证高可用机制的了,对吧? 不至于一无所知,现场还能给面试官画画图。 要是遇上面试官确实是 Kafka 高手,深挖了问,那你只能说不好意思,太深入的你没研究过。 三、如何保证消息不被重复消费? 或者说,如何保证消息消费的幂等性?
面试官心理分析
其实这是很常见的一个问题,这俩问题基本可以连起来问。 既然是消费消息,那肯定要考虑会不会重复消费? 能不能避免重复消费? 或者重复消费了也别造成系统异常可以吗? 这个是 MQ 领域的基本问题,其实本质上还是问你使用消息队列如何保证幂等性,这个是你架构里要考虑的一个问题。
面试题剖析
回答这个问题,首先你别听到重复消息这个事儿,就一无所知吧,你先大概说一说可能会有哪些重复消费的问题。
首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。 因为这问题通常不是 MQ 自己保证的,是由我们开发来保证的。 挑一个 Kafka 来举个例子,说说怎么重复消费吧。
Kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示“我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset 来继续消费吧”。
但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接 kill 进程了,再重启。 这会导致 consumer 有些消息处理了,但是没来得及提交 offset,尴尬了。 重启之后,少数消息会再次消费一次。
举个栗子。
有这么个场景。 数据 1/2/3 依次进入 kafka,kafka 会给这三条数据每条分配一个 offset,代表这条数据的序号,我们就假设分配的 offset 依次是 152/153/154。 消费者从 kafka 去消费的时候,也是按照这个顺序去消费。 假如当消费者消费了 offset=153 的这条数据,刚准备去提交 offset 到 zookeeper,此时消费者进程被重启了。 那么此时消费过的数据 1/2 的 offset 并没有提交,kafka 也就不知道你已经消费了 offset=153 这条数据。 那么重启之后,消费者会找 kafka 说,嘿,哥儿们,你给我接着把上次我消费到的那个地方后面的数据继续给我传递过来。 由于之前的 offset 没有提交成功,那么数据 1/2 会再次传过来,如果此时消费者没有去重的话,那么就会导致重复消费。

如果消费者干的事儿是拿一条数据就往数据库里写一条,会导致说,你可能就把数据 1/2 在数据库里插入了 2 次,那么数据就错啦。
其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。
举个例子吧。 假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了? 但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。
一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。
幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。
所以第二个问题来了,怎么保证消息队列消费的幂等性?
其实还是得结合业务来思考,我这里给几个思路: 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗? 如果没有消费过,你就处理,然后这个 id 写 Redis。 如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。 比如基于数据库的唯一键来保证重复数据不会重复插入多条。 因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。


当然,如何保证 MQ 的消费是幂等性的,需要结合具体的业务来看。 四、如何保证消息的可靠性传输? 或者说,如何处理消息丢失的问题?
面试官心理分析
这个是肯定的,用 MQ 有个基本原则,就是数据不能多一条,也不能少一条,不能多,就是前面说的重复消费和幂等性问题。 不能少,就是说这数据别搞丢了。 那这个问题你必须得考虑一下。
如果说你这个是用 MQ 来传递非常核心的消息,比如说计费、扣费的一些消息,那必须确保这个 MQ 传递过程中绝对不会把计费消息给弄丢。
面试题剖析
数据的丢失问题,可能出现在生产者、MQ、消费者中,咱们从 RabbitMQ 和 Kafka 分别来分析一下吧。
RabbitMQ

生产者弄丢了数据
生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。
此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息; 如果收到了消息,那么可以提交事务channel.txCommit。 // 开启事务 channel.txSelect try { // 这里发送消息 } catch ( Exception e) { channel.txRollback // 这里再次重发这条消息 } // 提交事务 channel.txCommitCopy to clipboardErrorCopied
但是问题是,RabbitMQ 事务机制(同步)一搞,基本上吞吐量会下来,因为太耗性能。
所以一般来说,如果你要确保说写 RabbitMQ 的消息别丢,可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。 如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。 而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。
所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。
RabbitMQ 弄丢了数据
就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。 除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。
设置持久化有两个步骤: 创建 queue 的时候将其设置为持久化 这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。 第二个是发送消息的时候将消息的 deliveryMode 设置为 2 就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。 必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。
注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。
所以,持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack,你也是可以自己重发的。
消费端弄丢了数据
RabbitMQ 如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。
这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。 这样的话,如果你还没处理完,不就没有 ack 了? 那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
Kafka
消费端弄丢了数据
唯一可能导致消费者弄丢数据的情况,就是说,你消费到了这个消息,然后消费者那边自动提交了 offset,让 Kafka 以为你已经消费好了这个消息,但其实你才刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。
这不是跟 RabbitMQ 差不多吗,大家都知道 Kafka 会自动提交 offset,那么只要关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢。 但是此时确实还是可能会有重复消费,比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。
生产环境碰到的一个问题,就是说我们的 Kafka 消费者消费到了数据之后是写到一个内存的 queue 里先缓冲一下,结果有的时候,你刚把消息写入内存 queue,然后消费者会自动提交 offset。 然后此时我们重启了系统,就会导致内存 queue 里还没来得及处理的数据就丢失了。
Kafka 弄丢了数据
这块比较常见的一个场景,就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。 大家想想,要是此时其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不就少了一些数据? 这就丢了一些数据啊。
生产环境也遇到过,我们也是,之前 Kafka 的 leader 机器宕机了,将 follower 切换为 leader 之后,就会发现说这个数据就丢了。
所以此时一般是要求起码设置如下 4 个参数: 给 topic 设置 replication.factor 参数: 这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。 在 Kafka 服务端设置 min.insync.replicas 参数: 这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。 在 producer 端设置 acks=all: 这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。 在 producer 端设置 retries=MAX(很大很大很大的一个值,无限次重试的意思): 这个是要求一旦写入失败,就无限重试,卡在这里了。
我们生产环境就是按照上述要求配置的,这样配置之后,至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。
生产者会不会弄丢数据?
如果按照上述的思路设置了 acks=all,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。 如果没满足这个条件,生产者会自动不断的重试,重试无限次。 五、如何保证消息的顺序性?
面试官心理分析
其实这个也是用 MQ 的时候必问的话题,第一看看你了不了解顺序这个事儿? 第二看看你有没有办法保证消息是有顺序的? 这是生产系统中常见的问题。
面试题剖析
我举个例子,我们以前做过一个 mysql binlog 同步的系统,压力还是非常大的,日同步数据要达到上亿,就是说数据从一个 mysql 库原封不动地同步到另一个 mysql 库里面去(mysql -> mysql)。 常见的一点在于说比如大数据 team,就需要同步一个 mysql 库过来,对公司的业务系统的数据做各种复杂的操作。
你在 mysql 里增删改一条数据,对应出来了增删改 3 条 binlog 日志,接着这三条 binlog 发送到 MQ 里面,再消费出来依次执行,起码得保证人家是按照顺序来的吧? 不然本来是: 增加、修改、删除; 你愣是换了顺序给执行成删除、修改、增加,不全错了么。
本来这个数据同步过来,应该最后这个数据被删除了; 结果你搞错了这个顺序,最后这个数据保留下来了,数据同步就出错了。
先看看顺序会错乱的俩场景:
RabbitMQ: 一个 queue,多个 consumer。 比如,生产者向 RabbitMQ 里发送了三条数据,顺序依次是 data1/data2/data3,压入的是 RabbitMQ 的一个内存队列。 有三个消费者分别从 MQ 中消费这三条数据中的一条,结果消费者2先执行完操作,把 data2 存入数据库,然后是 data1/data3。 这不明显乱了。

Kafka: 比如说我们建了一个 topic,有三个 partition。 生产者在写的时候,其实可以指定一个 key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。 消费者从 partition 中取出来数据的时候,也一定是有顺序的。 到这里,顺序还是 ok 的,没有错乱。 接着,我们在消费者里可能会搞多个线程来并发处理消息。 因为如果消费者是单线程消费处理,而处理比较耗时的话,比如处理一条消息耗时几十 ms,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。 而多个线程并发跑的话,顺序可能就乱掉了。

解决方案
RabbitMQ
拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点; 或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

Kafka
一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。 写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue; 然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。

六、如何解决消息队列的延时以及过期失效问题? 消息队列满了以后该怎么处理? 有几百万消息持续积压几小时,说说怎么解决?

面试官心理分析
你看这问法,其实本质针对的场景,都是说,可能你的消费端出了问题,不消费了; 或者消费的速度极其慢。 接着就坑爹了,可能你的消息队列集群的磁盘都快写满了,都没人消费,这个时候怎么办? 或者是这整个就积压了几个小时,你这个时候怎么办? 或者是你积压的时间太长了,导致比如 RabbitMQ 设置了消息过期时间后就没了怎么办?
所以就这事儿,其实线上挺常见的,一般不出,一出就是大 case。 一般常见于,举个例子,消费端每次消费之后要写 mysql,结果 mysql 挂了,消费端 hang 那儿了,不动了; 或者是消费端出了个什么岔子,导致消费速度极其慢。
面试题剖析
关于这个事儿,我们一个一个来梳理吧,先假设一个场景,我们现在消费端出故障了,然后大量消息在 mq 里积压,现在出事故了,慌了。
大量消息在 mq 里积压了几个小时了还没解决
几千万条数据在 MQ 里积压了七八个小时,从下午 4 点多,积压到了晚上 11 点多。 这个是我们真实遇到过的一个场景,确实是线上故障了,这个时候要不然就是修复 consumer 的问题,让它恢复消费速度,然后傻傻的等待几个小时消费完毕。 这个肯定不能在面试的时候说吧。
一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟就是 18 万条。 所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概 1 小时的时间才能恢复过来。
一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下: 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉。 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。 这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
mq 中的消息过期失效了
假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。 如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。 那这就是第二个坑了。 这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。
这个情况下,就不是说要增加 consumer 消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。 我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。 就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。 这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。 也只能是这样了。
假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。
mq 都快写满了
如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办? 这个还有别的办法吗? 没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。 然后走第二个方案,到了晚上再补数据吧。
七、如果让你写一个消息队列,该如何进行架构设计? 说一下你的思路。
面试官心理分析
其实聊到这个问题,一般面试官要考察两块:
你有没有对某一个消息队列做过较为深入的原理的了解,或者从整体了解把握住一个消息队列的架构原理。 看看你的设计能力,给你一个常见的系统,就是消息队列系统,看看你能不能从全局把握一下整体架构设计,给出一些关键点出来。 说实话,问类似问题的时候,大部分人基本都会蒙,因为平时从来没有思考过类似的问题,大多数人就是平时埋头用,从来不去思考背后的一些东西。 类似的问题,比如,如果让你来设计一个 Spring 框架你会怎么做? 如果让你来设计一个 Dubbo 框架你会怎么做? 如果让你来设计一个 MyBatis 框架你会怎么做?
面试题剖析
其实回答这类问题,说白了,不求你看过那技术的源码,起码你要大概知道那个技术的基本原理、核心组成部分、基本架构构成,然后参照一些开源的技术把一个系统设计出来的思路说一下就好。
比如说这个消息队列系统,我们从以下几个角度来考虑一下: 首先这个 mq 得支持可伸缩性吧,就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎么搞? 设计个分布式的系统呗,参照一下 kafka 的设计理念,broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。 如果现在资源不够了,简单啊,给 topic 增加 partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了? 其次你得考虑一下这个 mq 的数据要不要落地磁盘吧? 那肯定要了,落磁盘才能保证别进程挂了数据就丢了。 那落磁盘的时候怎么落啊? 顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是 kafka 的思路。 其次你考虑一下你的 mq 的可用性啊? 这个事儿,具体参考之前可用性那个环节讲解的 kafka 的高可用保障机制。 多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。 能不能支持数据 0 丢失啊? 可以的,参考我们之前说的那个 kafka 数据零丢失方案。
mq 肯定是很复杂的,面试官问你这个问题,其实是个开放题,他就是看看你有没有从架构角度整体构思和设计的思维以及能力。 确实这个问题可以刷掉一大批人,因为大部分人平时不思考这些东西。


参考文章:
https://mp.weixin.qq.com/s/3GMs3ae7ffDFgia9VSDMEg
https://mp.weixin.qq.com/s/hAw2KEnZJNIq_qVw8H1UMg
大数据
2020-02-12 10:40:00
本文首发于公众号「Python知识圈」,如需转载,请在公众号联系作者授权。
前言
上一篇文章整理了的公众号所有文章的导航链接,其实如果手动整理起来的话,是一件很费力的事情,因为公众号里添加文章的时候只能一篇篇的选择,是个单选框。
面对几百篇的文章,这样一个个选择的话,是一件苦差事。
pk哥作为一个 Pythoner,当然不能这么低效,我们用爬虫把文章的标题和链接等信息提取出来。
抓包
我们需要通过抓包提取公众号文章的请求的 URL,参考之前写过的一篇抓包的文章 Python爬虫APP前的准备 ,pk哥这次直接抓取 PC 端微信的公众号文章列表信息,更简单。
我以抓包工具 Charles 为例,勾选容许抓取电脑的请求,一般是默认就勾选的。
为了过滤掉其他无关请求,我们在左下方设置下我们要抓取的域名。
打开 PC 端微信,打开 「Python知识圈」公众号文章列表后,Charles 就会抓取到大量的请求,找到我们需要的请求,返回的 JSON 信息里包含了文章的标题、摘要、链接等信息,都在 comm_msg_info 下面。

这些都是请求链接后的返回,请求链接 url 我们可以在 Overview 中查看。
通过抓包获取了这么多信息后,我们可以写爬虫爬取所有文章的信息并保存了。
初始化函数
公众号历史文章列表向上滑动,加载更多文章后发现链接中变化的只有 offset 这个参数,我们创建一个初始化函数,加入代理 IP,请求头和信息,请求头包含了 User-Agent、Cookie、Referer。
这些信息都在抓包工具可以看到。
请求数据
通过抓包分析出来了请求链接,我们就可以用 requests 库来请求了,用返回码是否为 200 做一个判断,200 的话说明返回信息正常,我们再构建一个函数 parse_data() 来解析提取我们需要的返回信息。 def request_data(self): try: response = requests.get(self.base_url.format(self.offset), headers=self.headers, proxies=self.proxy) print(self.base_url.format(self.offset)) if 200 == response.status_code: self.parse_data(response.text) except Exception as e: print(e) time.sleep(2) pass
提取数据
通过分析返回的 Json 数据,我们可以看到,我们需要的数据都在 app_msg_ext_info 下面。
我们用 json.loads 解析返回的 Json 信息,把我们需要的列保存在 csv 文件中,有标题、摘要、文章链接三列信息,其他信息也可以自己加。 def parse_data(self, responseData): all_datas = json.loads(responseData) if 0 == all_datas['ret'] and all_datas['msg_count']>0: summy_datas = all_datas['general_msg_list'] datas = json.loads(summy_datas)['list'] a = [] for data in datas: try: title = data['app_msg_ext_info']['title'] title_child = data['app_msg_ext_info']['digest'] article_url = data['app_msg_ext_info']['content_url'] info = {} info['标题'] = title info['小标题'] = title_child info['文章链接'] = article_url a.append(info) except Exception as e: print(e) continue print('正在写入文件') with open('Python公众号文章合集1.csv', 'a', newline='', encoding='utf-8') as f: fieldnames = ['标题', '小标题', '文章链接'] # 控制列的顺序 writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(a) print("写入成功") print('----------------------------------------') time.sleep(int(format(random.randint(2, 5)))) self.offset = self.offset+10 self.request_data() else: print('抓取数据完毕!')
这样,爬取的结果就会以 csv 格式保存起来。
运行代码时,可能会遇到 SSLError 的报错,最快的解决办法就是 base_url 前面的 https 去掉 s 再运行。
保存markdown格式的链接
经常写文章的人应该都知道,一般写文字都会用 Markdown 的格式来写文章,这样的话,不管放在哪个平台,文章的格式都不会变化。
在 Markdown 格式里,用 [文章标题](文章url链接) 表示,所以我们保存信息时再加一列信息就行,标题和文章链接都获取了,Markdown 格式的 url 也就简单了。 md_url = '[{}]'.format(title) + '({})'.format(article_url)
爬取完成后,效果如下。
我们把 md链接这一列全部粘贴到 Markdown 格式的笔记里就行了,大部分的笔记软件都知道新建 Markdown 格式的文件的。
这样,这些导航文章链接整理起来就是分类的事情了。
你用 Python 解决过生活中的小问题吗?欢迎留言讨论。
欢迎关注公众号「Python知识圈」,公众号后台回复关键字,获取更多干货。 回复「英语」:送你英语 7000 单词速记法,亲测非常有效。 回复「编程」:免费获赠2019最新编程资料,认真学完BAT offer 拿到手软。 回复「赚钱」:领取简单可实操的 36 个赚钱的小项目,每天多赚100块零花钱。 回复「电子书」:免费送你10本Python电子书。
大数据
2020-02-11 22:45:02
随着数据中台的概念愈发火热,越来越多的技术公司开始慢慢驶入中台的赛道,无论是数据中台、技术中台还是业务中台等等,只要与中台沾上边儿的,大家理解的概念与期待产品应该有的样子都各有不同又自成体系。也正因为此,被“中台的风”吹着跑的各个企业决策者们,也对这个概念愈发好奇与着迷,似乎只要快速搭上“中台”的这趟车,就能让企业摇身一变,迅速转型,进而成为行业佼佼者。而现实中,中台真的是企业的灵丹妙药吗?而数澜的数栖平台又与数据中台有什么关系呢?
先让我们来回顾一下,中台的概念是如何在国内突然爆红的。
一、中台,风从哪儿来?
「中台」,原本是一种美军作战概念,即通过高效、统一的后方系统,来支持前端的机动部队,提高作战效率,减少冗余投入。
而这一概念在国内的开端,依据现在普遍流行的说法,则源于马云 2015 年带领团队对开发出《部落冲突》、《皇室战争》等手游的芬兰公司 Supercell 的一次拜访。这家仅有 200 人不到的小公司,在 2015 财年已创造出 23.3 亿美元的营收。而彼时的阿里,员工人数 3.4W,2015 财年的营收为 122.93 亿美元。
能够支撑 Supercell 公司这种高效散兵作战模式的基础,是他们经过 6 年时间沉淀下来的游戏中台。中台将游戏开发过程中公共、通用的游戏素材和算法整理起来,可以同时支持几个小团队在几周时间内研发出一款新游戏,并能鼓励员工充分试错。但在愈发庞大的阿里生态体系内部,则因为业务的快速扩张和增长,出现了不同业务线之间“烟囱林立”、资源利用率低的问题。同时,部门之间常常因为所谈合作不能立即产生收益,基于 KPI 的问题,最终都会被废止掉。长此以往,公司的创新力实际上也会逐渐下降。
因此,马云回国后,便开始全面推广「中台」战略。并于同年底基于「大中台,小前台」的战略,对组织架构进行了全面彻底的调整。
在阿里已经成功实施内部「中台」战略的两年后,也就是从 2018 年底到 2019 年初这短短半年内,各个大型互联网企业开始进行大规模组织架构调整。
2018 年,商业大环境开始发生变化。To C 业务开始逐渐进入瓶颈期,以往靠着流量红利疯狂扩张业务的大型互联网企业,已经无法找到当年百试百灵的路数了。面对整个互联网行业「水温」的变化,企业意识到降本增效势在必行,转型同样势在必行。于是,各大型互联网企业开始向 To B 业务模式转型。同时,企业内部管理也开始走向精益化,以往的疯狂扩张导致的各种「业务烟囱」、「部门墙」都在推倒名单中。
当云计算、大数据、人工智能、支付能力,成为企业转型中业务较量的关键点时,数据能力、算法能力、调度能力的沉淀,则成为了考验企业内部是否能够快速支撑前台业务,实现企业转型的重要环节。而这些能力的背后,中台的重要性不言而喻。
因此,「旁观者们」从行动上,正式入局。
腾讯的「930 变革」、京东的中台战略、美团的数据全量打通、字节跳动的「直播大中台」、百度的 All in AI 战略,都让中台概念持续升温,也让整个行业热度上涨不断。
二、数据中台,到底是什么?
数澜科技铁教授曾在《 数据中台系列(一):你的企业真的需要「数据中台」吗? 》一文中提出:
数据中台,它不仅仅是我们平时会提到的任何一种工具,也不仅仅是一种企业协同工作的方法,更不能把它当做是一个简单的组织架构。
「 数据中台,包括平台、工具、数据、组织、流程、规范等一切与企业数据资产如何用起来所相关的。 企业所属行业不同,经营策略不同,从而数据场景也千差万别。再加上企业人员运用数据的能力参差不齐,这就导致了 每一家企业的数据中台都是独一无二的,不是购买一个所谓的数据中台工具就能解决的。 当然合适的工具是可以降低企业应用数据难度的,这是强调的是「合适的」,而不是「高级的」。」
数澜数据中台理念
而在数澜看来,数据中台是一种战略选择和组织形式,通过有型的产品支撑和实施方法论,解决大企业面临的数据孤岛、数据维护混乱、数据价值利用低的问题,依据企业特有的业务和架构,构建一套源源不断地把数据变成资产并服务于业务的,可持续让企业数据用起来的机制,让数据可见、可懂、可用、可运营。
它的出现,基于以下两个大前提:
1)丰富的数据维度;
以阿里巴巴为例,TCIF & IDMAPPING,淘宝消费者信息工厂和用户识别,打通了阿里集团所有相关业务域,建立了几千个标签来刻画用户画像。比如:你的真实性别、购物性别、音乐风格偏爱是「R&B」、你的线上购物行为特征是「爱薅羊毛还是财大气粗」等等。如果没有这些用户数据维度,标签的建立无法做到大而全,也就无法提升用户画像的精准度。
2)多个大数据场景。
同样,数据服务支撑了阿里妈妈、淘宝、天猫、支付宝等多个业务板块的场景,每天都有上亿的调用次数。通过业务效果反馈,进而不断优化调整数据和模型。现在许多企业想要建设数据中台,却发现没有实际的数据应用场景,无法进行切入。
三、数栖平台,就是数澜的数据中台吗?
正如上文提到的,数据中台是各个企业独有的一种战略选择和组织形式,市面上不可能存在数据中台这样的一个产品。
数澜 CEO 风剑在面对记者采访时曾说: 「在我看来,但凡说销售数据中台产品的人,都是在忽悠。我对数据中台的定义是,数据中台绝对是不可复制的;数据中台是企业管理与运营的一套机制,一套让数据可用起来的可持续的机制。 企业需要一套自己的数据资产管理体系与框架,但不可能存在一个所有企业和组织都适用的、通用的数据中台框架。 」
也因此,数澜的数栖平台,仅仅是一个一站式数据应用基础设施,它是在帮助用户搭建自有数据中台过程中,必不可少的一套工具。通过数栖平台,企业可以让自己业务沉淀多年的数据融汇打通,同时,通过开发平台可以帮助企业内部的开发同学快速的进行对数据的处理,将数据成体系有逻辑的被管理起来,并且开发出更多可被业务使用的标签,向上层提供更多的弹药。
数栖平台架构
而仅仅通过数栖平台,企业很难快速的构建起完整的数据中台。在这个过程中,就需要客户对于自身业务及数据的全流程完整梳理,同时也需要数澜将沉淀多年的中台建设方法论融入到帮助客户共同建设数据中台的每一个细节中。只有科学的工具+先进的方法论,二者结合在一起,才能够实现最终的目标。
扫 它 ↓ 深 入 了 解 数 栖 呀!
大数据
2019-11-29 10:15:00
描述:今天早上到公司,发现测试集群中的一台机器的磁盘使用率100%,而其他节点的磁盘使用率只有30%左右,检查磁盘的使用情况后,使用率饱满的机器上,90%的数据都是/dfs目录下的,因为只是昨天项目测试刚跑进来的数据,删是不可能的,所以只能想办法对集群中的数据进行平衡。
引起这种情况的方式很多:
1. 添加新的Datanode节点
2. 人为干预将数据的副本数降低或者增加
我们都知道当HDFS出现数据不平衡的时候,就会造成MapReduce或Spark等应用程序无法很好的利用本地计算的优势,而且Datanode节点之间也没有更好的网络带宽利用率,某些Datanode节点的磁盘无法使用等等问题。
在Hadoop中,提供了hdfs balancer程序用来保证HDFS的数据平衡,我们先看一下这个程序的参数:
hdfs balancer --help
Usage: hdfs balancer
[-policy ] the balancing policy: datanode or blockpool
[-threshold ] Percentage of disk capacity
[-exclude [-f | ]] Excludes the specified datanodes.
[-include [-f | ]] Includes only the specified datanodes.
[-idleiterations ] Number of consecutive idle iterations (-1 for Infinite) before exit.
[-runDuringUpgrade] Whether to run the balancer during an ongoing HDFS upgrade.This is usually not desired since it will not affect used space on over-utilized machines.
Generic options supported are
-conf specify an application configuration file
-D use value for given property
-fs specify a namenode
-jt specify a ResourceManager
-files specify comma separated files to be copied to the map reduce cluster
-libjars specify comma separated jar files to include in the classpath.
-archives specify comma separated archives to be unarchived on the compute machines.
The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]
选项的含义根据描述应该很好理解,其中-threshold参数是用来判断数据平衡的依据,值范围为0-100。默认值为10,表示HDFS达到平衡状态的磁盘使用率偏差值为10%,如果机器与机器之间磁盘使用率偏差小于10%,那么我们就认为HDFS集群已经达到了平衡的状态。
我们可以从CDH平台的CM上看到该参数是默认值和含义:
该参数具体含义为:判断集群是否平衡的目标参数,每一个 Datanode 存储使用率和集群总存储使用率的差值都应该小于这个阀值,理论上,该参数设置的越小,整个集群就越平衡,但是在线上环境中,Hadoop集群在进行balance时,还在并发的进行数据的写入和删除,所以有可能无法到达设定的平衡参数值。
参数-policy表示的平衡策略,默认为DataNode。
该参数的具体含义为:应用于重新平衡 HDFS 存储的策略。默认DataNode策略平衡了 DataNode 级别的存储。这类似于之前发行版的平衡策略。BlockPool 策略平衡了块池级别和 DataNode 级别的存储。BlockPool 策略仅适用于 Federated HDFS 服务。
参数-exclude和-include是用来选择balancer时,可以指定哪几个DataNode之间重分布,也可以从HDFS集群中排除哪几个节点不需要重分布,比如:
hdfs balancer -include CDHD,CDHA,CDHM,CDHT,CDHO
除了上面的参数会影响HDFS数据重分布,还有如下的参数也会影响重分布,
dfs.datanode.balance.bandwidthPerSec, dfs.balance.bandwidthPerSec
该默认设置:1048576(1M/s),个人建议如果机器的网卡和 交换机 的带宽有限,可以适当降低该速度,一般默认就可以了。
该参数含义如下:
HDFS平衡器检测集群中使用过度或者使用不足的DataNode,并在这些DataNode之间移动数据块来保证负载均衡。如果不对平衡操作进行带宽限制,那么它会很快就会抢占所有的网络资源,不会为Mapreduce作业或者数据输入预留资源。参数dfs.balance.bandwidthPerSec定义了每个DataNode平衡操作所允许的最大使用带宽,这个值的单位是byte,这是很不直观的,因为网络带宽一般都是用bit来描述的。因此,在设置的时候,要先计算好。DataNode使用这个参数来控制网络带宽的使用,但不幸的是,这个参数在守护进程启动的时候就读入,导致管理员没办法在平衡运行时来修改这个值,如果需要调整就要重启集群。
下面简单介绍一下balancer的原理:
Rebalance程序作为一个独立的进程与NameNode进行分开执行。
步骤1:
Rebalance Server从NameNode中获取所有的DataNode情况:每一个DataNode磁盘使用情况。
步骤2:
Rebalance Server计算哪些机器需要将数据移动,哪些机器可以接受移动的数据。并且从NameNode中获取需要移动的数据分布情况。
步骤3:
Rebalance Server计算出来可以将哪一台机器的block移动到另一台机器中去。
步骤4,5,6:
需要移动block的机器将数据移动的目的机器上去,同时删除自己机器上的block数据。
步骤7:
Rebalance Server获取到本次数据移动的执行结果,并继续执行这个过程,一直没有数据可以移动或者HDFS集群以及达到了平衡的标准为止。
实战:
找一个比较空闲的的Datanode执行,建议不要在NameNode执行:
hdfs balancer -include CDHD,CDHA,CDHM,CDHT,CDHO
执行过程如下(部分),大家可以对照上面的流程看日志,可能会更清楚一点:
16/07/11 09:35:12 INFO balancer.Balancer: namenodes = [hdfs://CDHB:8022]
16/07/11 09:35:12 INFO balancer.Balancer: parameters = Balancer.Parameters [BalancingPolicy.Node, threshold = 10.0, max idle iteration = 5, number of nodes to be excluded = 0, number of nodes to be included = 5, run during upgrade = false]
Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved
16/07/11 09:35:14 INFO net.NetworkTopology: Adding a new node: /default/192.168.1.130:50010
16/07/11 09:35:14 INFO net.NetworkTopology: Adding a new node: /default/192.168.1.131:50010
16/07/11 09:35:14 INFO net.NetworkTopology: Adding a new node: /default/192.168.1.135:50010
16/07/11 09:35:14 INFO net.NetworkTopology: Adding a new node: /default/192.168.1.138:50010
16/07/11 09:35:14 INFO net.NetworkTopology: Adding a new node: /default/192.168.1.139:50010
16/07/11 09:35:14 INFO balancer.Balancer: 2 over-utilized: [192.168.1.130:50010:DISK, 192.168.1.135:50010:DISK]
16/07/11 09:35:14 INFO balancer.Balancer: 1 underutilized: [192.168.1.131:50010:DISK]
16/07/11 09:35:14 INFO balancer.Balancer: Need to move 203.48 GB to make the cluster balanced.
16/07/11 09:35:14 INFO balancer.Balancer: Decided to move 10 GB bytes from 192.168.1.130:50010:DISK to 192.168.1.131:50010:DISK
16/07/11 09:35:14 INFO balancer.Balancer: Decided to move 10 GB bytes from 192.168.1.135:50010:DISK to 192.168.1.138:50010:DISK
16/07/11 09:35:14 INFO balancer.Balancer: Will move 20 GB in this iteration
16/07/11 09:36:00 INFO balancer.Dispatcher: Successfully moved blk_1074048042_307309 with size=134217728 from 192.168.1.130:50010:DISK to 192.168.1.131:50010:DISK through 192.168.1.130:50010
16/07/11 09:36:07 INFO balancer.Dispatcher: Successfully moved blk_1074049886_309153 with size=134217728 from 192.168.1.135:50010:DISK to 192.168.1.138:50010:DISK through 192.168.1.135:50010
16/07/11 09:36:09 INFO balancer.Dispatcher: Successfully moved blk_1074048046_307313 with size=134217728 from 192.168.1.130:50010:DISK to 192.168.1.131:50010:DISK through 192.168.1.130:50010
16/07/11 09:36:10 INFO balancer.Dispatcher: Successfully moved blk_1074049900_309167 with size=134217728 from 192.168.1.135:50010:DISK to 192.168.1.138:50010:DISK through 192.168.1.135:50010
16/07/11 09:36:16 INFO balancer.Dispatcher: Successfully moved blk_1074048061_307328 with size=134217728 from 192.168.1.130:50010:DISK to 192.168.1.131:50010:DISK through 192.168.1.130:50010
16/07/11 09:36:17 INFO balancer.Dispatcher: Successfully moved blk_1074049877_309144 with size=134217728 from 192.168.1.135:50010:DISK to 192.168.1.138:50010:DISK through 192.168.1.135:50010
如果你使用的是CDH集成平台,也可以通过CM来执行数据重分布:
步骤1:先选择HDFS 组件 的页面,如下:
步骤2:找到页面右侧的操作选择,从下拉框中选择数据“重新平衡”选项

步骤3:确定“重新平衡”就开始安装默认的设置规则重新分布DataNode的Block数据了,可以用CM的日志中查看具体的执行过程。
参考博客: https://www.2cto.com/net/201607/525222.html
大数据
2019-11-29 09:28:00
经过一段时间的演化,spark-binlog,delta-plus慢慢进入正轨。spark-binlog可以将MySQL binlog作为标准的Spark数据源来使用,目前支持insert/update/delete 三种事件的捕捉。 delta-plus则是对Delta Lake的一个增强库,譬如在Delta Plus里实现了将binlog replay进Detla表,从而保证Delta表和数据库表接近实时同步。除此之外,detla-plus还集成了譬如布隆过滤器等来尽快数据更新更新速度。更多特性可参考我写的专栏。
数据湖Delta Lake 深入解析 ​ zhuanlan.zhihu.com 图标 有了这两个库,加上Spark,我们就能通过两行代码完成库表的同步。
以前如果要做数据增量同步,大概需要这么个流程:
问题很明显,Pipeline长,涉及到技术多,中间转存其实也挺麻烦的,难做到实时。我们希望可以更简单些,比如最好是这样:
然后我可能只要写如下代码就可以搞定:
val spark: SparkSession = ???
val df = spark.readStream. format("org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource"). option("host","127.0.0.1"). option("port","3306"). option("userName","xxxxx"). option("password","xxxxx"). option("databaseNamePattern","mlsql_console"). option("tableNamePattern","script_file"). optioin("binlogIndex","4"). optioin("binlogFileOffset","4"). load()
df.writeStream. format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").
option(" path ","/tmp/sync/tables"). option("mode","Append"). option("idCols","id"). option("duration","5"). option("syncType","binlog"). checkpointLocation("/tmp/cpl-binlog2") .mode(OutputMode.Append).save("{db}/{table}") 读和写,非常简单。读你需要提供MySQL binlog信息,写的时候指定主键,以及表的存储路径。
如果使用MLSQL则更简单,下面是一个完整的流式同步脚本:
set streamName="binlog";
load binlog.`` where host="127.0.0.1" and port="3306" and userName="xxxx" and password="xxxxxx" and bingLogNamePrefix="mysql-bin" and binlogIndex="4" and binlogFileOffset="4" and databaseNamePattern="mlsql_console" and tableNamePattern="script_file" as table1;
save append table1
as rate. mysql_{db}.{table} options mode="Append" and idCols="id" and duration="5" and syncType="binlog" and checkpointLocation="/tmp/cpl-binlog2";
因为是增量同步,所以第一次需要先全量同步一次,用MLSQL也很简单:
connect jdbc where url="jdbc: mysql://127.0.0.1:3306/mlsql_console?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false " and driver="com.mysql.jdbc.Driver" and user="xxxxx" and password="xxxx" as db_cool;
load jdbc. db_cool.script_file as script_file; save overwrite script_file as delta. mysql_mlsql_console.script_file ;
load delta. mysql_mlsql_console.script_file as output; 如果你使用了Console则可在编辑器里直接运行:
如果你安装了binlog2delta插件, 则可享受向导便利:
大数据
2019-11-28 21:45:00