数据专栏

智能大数据搬运工,你想要的我们都有

科技资讯:

科技学院:

科技百科:

科技书籍:

网站大全:

软件大全:

【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
一、
1、什么是Hbase。
是一个高可靠性、高性能、列存储、可伸缩、实时读写的分布式数据库系统。
适合于存储非结构化数据,基于列的而不是基于行的模式
如图:Hadoop生态中HBase与其他部分的关系。
2、关系数据库已经流行很多年,并且Hadoop已经有了HDFS和MapReduce,为什么需要HBase?
Hadoop可以很好地解决大规模数据的离线批量处理问题,但是,受限于HadoopMapReduce编程框架的高延迟数据处理机制,使得Hadoop无法满足大规模数据实时处理应用的需求
HDFS面向批量访问模式,不是随机访问模式
传统的通用关系型数据库无法应对在数据规模剧增时导致的系统扩展性和性能问题(分库分表也不能很好解决)
传统关系数据库在数据结构变化时一般需要停机维护;空列浪费存储空间
因此,业界出现了一类面向半结构化数据存储和处理的高可扩展、低写入/查询延迟的系统,例如,键值数据库、文档数据库和列族数据库(如BigTable和HBase等)
HBase已经成功应用于互联网服务领域和传统行业的众多在线式数据分析处理系统中
3、HBase与传统的关系数据库的区别


(1)数据类型:关系数据库采用关系模型,具有丰富的数据类型和存储方式,HBase则采用了更加简单的数据模型,它把数据存储为未经解释的字符串
(2)数据操作:关系数据库中包含了丰富的操作,其中会涉及复杂的多表连接。HBase操作则不存在复杂的表与表之间的关系,只有简单的插入、查询、删除、清空等,因为HBase在设计上就避免了复杂的表和表之间的关系
(3)存储模式:关系数据库是基于行模式存储的。HBase是基于列存储的,每个列族都由几个文件保存,不同列族的文件是分离的
(4)数据索引:关系数据库通常可以针对不同列构建复杂的多个索引,以提高数据访问性能。HBase只有一个索引——行键,通过巧妙的设计,HBase中的所有访问方法,或者通过行键访问,或者通过行键扫描,从而使得整个系统不会慢下来
(5)数据维护:在关系数据库中,更新操作会用最新的当前值去替换记录中原来的旧值,旧值被覆盖后就不会存在。而在HBase中执行更新操作时,并不会删除数据旧的版本,而是生成一个新的版本,旧有的版本仍然保留
(6)可伸缩性:关系数据库很难实现横向扩展,纵向扩展的空间也比较有限。相反,HBase和BigTable这些分布式数据库就是为了实现灵活的水平扩展而开发的,能够轻易地通过在集群中增加或者减少硬件数量来实现性能的伸缩
二、Hbase数据模型

1、模型概述

HBase是一个稀疏、多维度、排序的映射表,这张表的索引是行键、列族、列限定符和时间戳
每个值是一个未经解释的字符串,没有数据类型
用户在表中存储数据,每一行都有一个可排序的行键和任意多的列
表在水平方向由一个或者多个列族组成,一个列族中可以包含任意多个列,同一个列族里面的数据存储在一起
列族支持动态扩展,可以很轻松地添加一个列族或列,无需预先定义列的数量以及类型,所有列均以字符串形式存储,用户需要自行进行数据类型转换
HBase中执行更新操作时,并不会删除数据旧的版本,而是生成一个新的版本,旧有的版本仍然保留(这是和HDFS只允许追加不允许修改的特性相关的)
2、数据坐标
HBase中需要根据行键、列族、列限定符和时间戳来确定一个单元格,因此,可以视为一个“四维坐标”,即[行键,列族, 列限定符,时间戳]

[“201505003”,“Info”,“email”, 1174184619081]
[“201505003”,“Info”,“email”, 1174184620720]
“xie@qq.com”
“you@163.com”
3、概念视图

4、物理视图
三、HBase实现原理
1、HBase的实现包括三个主要的功能组件:
(1)库函数:链接到每个客户端
(2)一个Master主服务器
(3)许多个Region服务器
主服务器Master负责管理和维护HBase表的分区信息,维护Region服务器列表,分配Region,负载均衡
Region服务器负责存储和维护分配给自己的Region,处理来自客户端的读写请求
客户端并不是直接从Master主服务器上读取数据,而是在获得Region的存储位置信息后,直接从Region服务器上读取数据
客户端并不依赖Master,而是通过Zookeeper来获得Region位置信息,大多数客户端甚至从来不和Master通信,这种设计方式使得Master负载很小
2、Region
开始只有一个Region,后来不断分裂
Region拆分操作非常快,接近瞬间,因为拆分之后的Region读取的仍然是原存储文件,直到“合并”过程把存储文件异步地写到独立的文件之后,才会读取新文件
同一个Region不会被分拆到多个Region服务器
每个Region服务器存储10-1000个Region

元数据表,又名.META.表,存储了Region和Region服务器的映射关系
当HBase表很大时, .META.表也会被分裂成多个Region
根数据表,又名-ROOT-表,记录所有元数据的具体位置
-ROOT-表只有唯一一个Region,名字是在程序中被写死的
Zookeeper文件记录了-ROOT-表的位置

客户端访问数据时的“三级寻址”
为了加速寻址,客户端会缓存位置信息,同时,需要解决缓存失效问题
寻址过程客户端只需要询问Zookeeper服务器,不需要连接Master服务器

3、HBase的三层结构中各层次的名称和作用
层次 名称 作用
第一层 Zookeper文件 记录了-ROOT-表的位置信息
第二层
第三层
-ROOT-表
.META.表
记录了.META.表的Region位置信息
-ROOT-表只能有一个Region。通过-ROOT-表,就可以访问.META.表中的数据
记录了用户数据表的Region位置信息,.META.表可以有多个Region,保存了HBase中所有用户数据表的Region位置信息
四、HBase运行机制
1、HBase系统架构

(1、客户端包含访问HBase的接口,同时在缓存中维护着已经访问过的Region位置信息,用来加快后续数据访问过程

(2、Zookeeper可以帮助选举出一个Master作为集群的总管,并保证在任何时刻总有唯一一个Master在运行,这就避免了Master的“单点失效”问题
(Zookeeper是一个很好的集群管理工具,被大量用于分布式计算,提供配置维护、域名服务、分布式同步、组服务等。)
(3. Master
主服务器Master主要负责表和Region的管理工作:
管理用户对表的增加、删除、修改、查询等操作
实现不同Region服务器之间的负载均衡
在Region分裂或合并后,负责重新调整Region的分布
对发生故障失效的Region服务器上的Region进行迁移
(4. Region服务器
Region服务器是HBase中最核心的模块,负责维护分配给自己的Region,并响应用户的读写请求
2、Region

(1、用户读写数据过程
用户写入数据时,被分配到相应Region服务器去执行
用户数据首先被写入到MemStore和Hlog中
只有当操作写入Hlog之后,commit()调用才会将其返回给客户端
当用户读取数据时,Region服务器会首先访问MemStore缓存,如果找不到,再去磁盘上面的StoreFile中寻找
(2、缓存的刷新
系统会周期性地把MemStore缓存里的内容刷写到磁盘的StoreFile文件中,清空缓存,并在Hlog里面写入一个标记、
每次刷写都生成一个新的StoreFile文件,因此,每个Store包含多个StoreFile文件

每个Region服务器都有一个自己的HLog文件,每次启动都检查该文件,确认最近一次执行缓存刷新操作之后是否发生新的写入操作;如果发现更新,则先写入MemStore,再刷写到StoreFile,最后删除旧的Hlog文件,开始为用户提供服务
(3、StroreFile的合并
每次刷写都生成一个新的StoreFile,数量太多,影响查找速度
调用Store.compact()把多个合并成一个
合并操作比较耗费资源,只有数量达到一个阈值才启动合并
3、Store工作原理
Store是Region服务器的核心
多个StoreFile合并成一个
触发分裂操作,1个父Region被分裂成两个子Region
单个StoreFile过大时,又
4、HLog工作原理
分布式环境必须要考虑系统出错。HBase采用HLog保证系统恢复
HBase系统为每个Region服务器配置了一个HLog文件,它是一种预写式日志(WriteAhead Log)
用户更新数据必须首先写入日志后,才能写入MemStore缓存,并且,直到MemStore缓存内容对应的日志已经写入磁盘,该缓存内容才能被刷写到磁盘
Zookeeper会实时监测每个Region服务器的状态,当某个Region服务器发生故障时,Zookeeper会通知Master
Master首先会处理该故障Region服务器上面遗留的HLog文件,这个遗留的HLog文件中包含了来自多个Region对象的日志记录
系统会根据每条日志记录所属的Region对象对HLog数据进行拆分,分别放到相应Region对象的目录下,然后,再将失效的Region重新分配到可用的Region服务器中,并把与该Region对象相关的HLog日志记录也发送给相应的Region服务器
Region服务器领取到分配给自己的Region对象以及与之相关的HLog日志记录以后,会重新做一遍日志记录中的各种操作,把日志记录中的数据写入到MemStore缓存中,然后,刷新到磁盘的StoreFile文件中,完成数据恢复
共用日志优点:提高对表的写操作性能;缺点:恢复时需要分拆日志
五、HBase性能
1、行键(RowKey)
行键是按照字典序存储,因此,设计行键时,要充分利用这个排序特点,将经常一起读取的数据存储到一块,将最近可能会被访问的数据放在一块。
举个例子:如果最近写入HBase表中的数据是最可能被访问的,可以考虑将时间戳作为行键的一部分,由于是字典序排序,所以可以使用Long.MAX_VALUE- timestamp作为行键,这样能保证新写入的数据在读取时可以被快速命中。
InMemory:创建表的时候,可以通过HColumnDescriptor.setInMemory(true)将表放到Region服务器的缓存中,保证在读取的时候被cache命中。

Max Version:创建表的时候,可以通过HColumnDescriptor.setMaxVersions(int maxVersions)设置表中数据的最大版本,如果只需要保存最新版本的数据,那么可以设置setMaxVersions(1)。

Time To Live创建表的时候,可以通过HColumnDescriptor.setTimeToLive(inttimeToLive)设置表中数据的存储生命期,过期数据将自动被删除,例如如果只需要存储最近两天的数据,那么可以设置setTimeToLive(2* 24 * 60 * 60)。

2、HBaseMaster默认基于Web的UI服务端口为60010,HBase region服务器默认基于Web的UI服务端口为60030.如果master运行在名为master.foo.com的主机中,mater的主页地址就是http://master.foo.com:60010,用户可以通过Web浏览器输入这个地址查看该页面
可以查看HBase集群的当前状态
3、NoSQL区别于关系型数据库的一点就是NoSQL不使用SQL作为查询语言,至于为何在NoSQL数据存储HBase上提供SQL接口

易使用,减少编码
4、HBase只有一个针对行健的索引
访问HBase表中的行,只有三种方式:
通过单个行健访问
通过一个行健的区间来访问
全表扫描
总结:
1、HBase数据库是BigTable的开源实现,和BigTable一样,支持大规模海量数据,分布式并发数据处理效率极高,易于扩展且支持动态伸缩,适用于廉价设备
2、HBase可以支持NativeJava API、HBaseShell、ThriftGateway、Hive等多种访问接口,可以根据具体应用场合选择相应访问方式
3、HBase实际上就是一个稀疏、多维、持久化存储的映射表,它采用行键、列键和时间戳进行索引,每个值都是未经解释的字符串。
4、HBase采用分区存储,一个大的表会被分拆许多个Region,这些Region会被分发到不同的服务器上实现分布式存储
5、HBase的系统架构包括客户端、Zookeeper服务器、Master主服务器、Region服务器。客户端包含访问HBase的接口;Zookeeper服务器负责提供稳定可靠的协同服务;Master主服务器主要负责表和Region的管理工作;Region服务器负责维护分配给自己的Region,并响应用户的读写请求
HBase常用操作之namespace

1、介绍

在HBase中,namespace命名空间指对一组表的逻辑分组,类似RDBMS中的database,方便对表在业务上划分。Apache HBase从0.98.0, 0.95.2两个版本开始支持namespace级别的授权操作,HBase全局管理员可以创建、修改和回收namespace的授权。

2、namespace
HBase系统默认定义了两个缺省的namespace hbase :系统内建表,包括namespace和meta表 default :用户建表时未指定namespace的表都创建在此

创建namespace hbase>create_namespace 'ai_ns'
删除namespace hbase>drop_namespace 'ai_ns'
查看namespace hbase>describe_namespace 'ai_ns'
列出所有namespace hbase>list_namespace
在namespace下创建表 hbase>create 'ai_ns:testtable', 'fm1'
查看namespace下的表 hbase>list_namespace_tables 'ai_ns'

3、授权
具备Create权限的namespace Admin可以对表创建和删除、生成和恢复快照
具备Admin权限的namespace Admin可以对表splits或major compactions

授权tenant-A用户对ai_ns下的写权限 hbase>grant 'tenant-A' 'W' '@ai_ns'
回收tenant-A用户对ai_ns的所有权限 hbase>revoke 'tenant-A''@ai_ns'
当前用户:hbase hbase>namespace_create 'hbase_perf' hbase>grant 'mike', 'W', '@hbase_perf'
当前用户:mike hbase>create 'hbase_perf.table20', 'family1' hbase>create 'hbase_perf.table50', 'family1'
mike创建了两张表table20和table50,同时成为这两张表的owner,意味着有'RWXCA'权限
此时,mike团队的另一名成员alice也需要获得hbase_perf下的权限,hbase管理员操作如下
当前用户:hbase hbase>grant 'alice', 'W', '@hbase_perf'
此时alice可以在hbase_perf下创建表,但是无法读、写、修改和删除hbase_perf下已存在的表
当前用户:alice hbase>scan 'hbase_perf:table20'
报错AccessDeniedException
如果希望alice可以访问已经存在的表,则hbase管理员操作如下
当前用户:hbase hbase>grant 'alice', 'RW', 'hbase_perf.table20' hbase>grant 'alice', 'RW', 'hbase_perf.table50'

在HBase中启用授权机制
hbase-site.xml hbase.security.authorization true hbase.coprocessor.master.classes org.apache.hadoop.hbase.security.access.AccessController hbase.coprocessor.region.classes org.apache.hadoop.hbase.security.token.TokenProvider,org.apache.hadoop.hbase.security.access.AccessController
配置完成后需要重启HBase集群

授权相关JIRA
HBASE-8409
HBASE-9206

4、总结
HBase namespace特性是对表资源进行隔离的一种技术,隔离技术决定了HBase能否实现资源统一化管理的关键,提高了整体的安全性。
Hbase官方手册-快速入门
https://blog.csdn.net/maosijunzi/article/details/78481870?locationNum=9&fps=1
大数据
2018-07-26 18:52:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
博主在前面的2.5章节讲述了linux系统本地YUM服务器的搭建和httpd轻量级静态网站服务器的安装,本节博主将为大家分享内网环境中搭建自己的网络YUM服务器的全过程。如果大家对本地YUM服务器还不熟悉,请点击前往 大数据教程(2.5):Linux系统搭建本地YUM源服务器 。
先列出本次使用到的额外命令: rm -rf /var/www/html/centos 删除软连接 ln -s /mnt/cdrom ./centos 创建软连接 yum erase openssh-clients移除(卸载)软件
由于2.5章节已经安装好了本地YUM服务器和httpd服务,本节就不再累赘的去讲解。
网络YUM搭建步骤:
(一)按照2.5章节搭建好本地YUM、httpd服务
(二)无论哪种配置,都需要先将光盘挂在到本地文件目录中,命令:mount -t iso9660 /dev/cdrom /mnt/cdrom
(三)将光盘/mnt/cdrom 软连接到httpd服务器的/var/www/html目录中 (cd /var/www/html; ln -s /mnt/cdrom ./centos )
(四)启动httpd服务:service httpd start,并浏览器访问http://192.168.29.133/centos

(五)yum的客户端配置这个http地址到repo配置文件中 cd /etc/yum.repos.d/ rename .repo .repo.bak * cp centos-local.repo.bak centos-aaron-05.repo vi centos-aaron-05.repo
(六)清理YUM仓库缓存,使其生效 yum clean all 清理仓库缓存 yum repolist查看服务器上的yum仓库 yum list查看仓库中有哪些包

注意:为了避免每次重启后都要手动mount,可以在/etc/fstab中加入一行挂载配置,即可自动挂载
vi /etc/fstab
/dev/cdrom /mnt/cdrom iso9660 defaults 0 0
最后总结:以上就时内网YUM的配置全过程,其它服务器可以将YUM地址指向此httpd地址即可。如果大家觉得博主的文章不错,请点赞;如果您对其它服务器技术或者博主本人感兴趣,请关注博主博客,并且随时欢迎同博主交流。
大数据
2018-07-23 22:33:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
Hadoop--HDFS
Edits和Fsimage机制详解
概述
fsimage镜像文件包含了整个 HDFS 文件系统的所有目录和文件的indoe(节点)信息,比如:/node01/node,会记录每个节点nodeid,以及节点之间父子路径。
以及文件名,文件大小,文件被切成几块,每个数据块描述信息、修改时间、访问时间等;此外还有对目录的修改时间、访问权限控制信息(目录所属用户,所在组等)等。
另外,edits文件主要是在NameNode已经启动情况下对HDFS进行的各种更新操作进行记录,比如 :hadoop fs -mkdir hadoop fs -delete hadoop fs -put等。
对于每次事务操作,都会用一个TXID来标识,OP_MKDIR OP_DELETE等。
Edits文件存储的操作,而fsimage文件存储的是执行操作后,变化的状态。(元数据)
HDFS客户端执行所有的写操作都会被记录到edits文件中。
关键点
1.当执行格式化指令时候,会在指定元数据目录生成 dfs/name/current/
最开始只有fsimage,没有edits文件(因为没有启动HDFS)
2.当初次启动HFDS,会生成edits_inprogress_0000000000000000001,此文件用于记录事务(写操作)
3.HDFS对于每次写操作,都会用一个事务ID(TXID)来记录,TXID是递增的。
4.edits_0000000000000000003-0000000000000000007,数字表示的合并后起始的事务id和终止事务id
5.seen_txid 存储的当前的事务id,和edits_inprogress最后的数字一致
6.datanode存储块的目录路径:/tmp/dfs/data/current/BP-859711469-192.168.150.137-1535216211704/current/finalized/subdir0/subdir0
7.finalized此目录存储的已经存储完毕的数据块,rbw目录存的是正在写但还未写完的数据块
查看Edits文件和Fsimage文件
hdfs oev -i edits_0000000000000000001-0000000000000000003 -o edits.xml
hdfs oiv -i fsimage_0000000000000000012 -o fsimage.xml -p XML
HDFS API操作
1.创建java工程
2.导入hadoop依赖jar包
连接namenode以及读取hdfs中指定文件 @Test public void testConnectNamenode() throws Exception{ Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(new URI("hdfs://192.168.234.211:9000"), conf); InputStream in=fs.open(new Path("/park/1.txt")); OutputStream out=new FileOutputStream("1.txt"); IOUtils.copyBytes(in, out, conf); } ​

上传文件到hdfs上 代码样例
@Test public void testPut() throws Exception{ Configuration conf=new Configuration(); conf.set("dfs.replication","1"); FileSystem fs=FileSystem.get(new URI("hdfs://192.168.234.21:9000"),conf,"root"); ByteArrayInputStream in=new ByteArrayInputStream("hello hdfs".getBytes()); OutputStream out=fs.create(new Path("/park/2.txt")); IOUtils.copyBytes(in, out, conf); } ​

从hdfs上删除文件
代码详解
@Test public void testDelete()throws Exception{ Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(new URI("hdfs://192.168.234.21:9000"),conf,"root"); //true表示无论目录是否为空,都删除掉。可以删除指定的文件 fs.delete(new Path("/park01"),true); //false表示只能删除不为空的目录。 fs.delete(new Path("/park01"),false); fs.close(); }

在hdfs上创建文件夹 代码详解
@Test ​ public void testMkdir()throws Exception{ ​ Configuration conf=new Configuration(); ​ FileSystem fs=FileSystem.get(new URI("hdfs://192.168.234.211:9000"),conf,"root"); ​ fs.mkdirs(new Path("/park02")); }

查询hdfs指定目录下的文件 代码详解
@Test public void testLs()throws Exception{ Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(new URI("hdfs://192.168.234.214:9000"),conf,"root"); RemoteIterator rt=fs.listFiles(new Path("/"), true); while(rt.hasNext()){ System.out.println(rt.next()); } }

递归查看指定目录下的文件 代码详解

@Test public void testLs()throws Exception{ Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(new URI("hdfs://192.168.234.214:9000"),conf,"root"); RemoteIterator rt=fs.listFiles(new Path("/"), true); while(rt.hasNext()){ System.out.println(rt.next()); } }

重命名 代码详解
@Test public void testCreateNewFile() throws Exception{ Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(new URI("hdfs://192.168.234.176:9000"),conf,"root"); fs.rename(new Path("/park"), new Path("/park01")); }

获取文件的块信息 代码详解
@Test public void testCopyFromLoaclFileSystem() throws Exception{ Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(new URI("hdfs://192.168.234.176:9000"),conf,"root"); BlockLocation[] data=fs.getFileBlockLocations(new Path("/park01/1.txt"),0,Integer.MaxValue); for(BlockLocation bl:data){ System.out.println(bl); } }

从HDFS下载文件过程
1.Client向namenode发起 Open file 请求。目的是获取指定文件的输入流。
namenode收到请求之后,会检查路径的合法性,此外,还是检查客户端的操作权限。如果检测未通过,则直接报错返回。后续过程不会发生。
2.Client也会向namenode发起:Getblockloaction请求,获取指定文件的元数据信息。如果第一步的检测通过,namenode会将元数据信息封装到输入流里,返回给客户端。
3.4 客户端根据元数据信息,直接去对应的datanode读取文件块,然后下载到本地(创建本地的输出流,然后做流的对接)
5.读完后,关流。
上传文件到HDFS
1.Client向namenode发现 Create file请求,目的是获取HDFS文件的输出流。namenode收到请求后,会检测路径的合法性和权限。如果检测未通过,直接报错返回。
如果通过检测,namenode会将文件的切块信息(比如文件被切成几块,每个文件块的副本存在哪台datanode上),然后把这些信息封装到输出流里,返回给客户端。
所以注意:文件块的输出(上传)是客户端直接和对应DN交互的,namenode的作用是告诉Client文件块要发送给哪个datanode上。
2.Client通过输出流,发送文件块(底层会将一个文件块打散成一个一个的packet,每个packet的大小=64kb)。这个过程的机制,叫Pipeline(数据流管道机制)
这种机制的目的:
为了提高网络效率,我们采取了把数据流和控制流分开的措施。在控制流从客户机到主Chunk、然后冉 到所有二级副本的同时,数据以管道的方式,顺序的沿着一个精心选择的Chunk服务器推送。我们的目标 是充分利用每台机器的带宽,避免网络瓶颈和高延时的连接,最小化推送所有数据的延时。 为了充分利用每台机器的带宽,数据沿着一个Chunk服务器顺序的推送,而不是以其它拓扑形式分散 推送(例如,树型拓扑结构)。线性推送模式下,每台机器所有的出凵带宽都用于以最快的速度传输数据,而 不是在多个接受者之间分配带宽。
3.4.5 。通过数据流管道机制,实现数据的发送和副本的复制。每台datanode服务器收到数据之后,会向上游反馈ack确认机制。直到第五步的ack发送给Client之后,再发送下一个packet。依次循环,直到所有的数据都复制完毕。此外,在底层传输的过程中,会用到全双工通信。
补充:建议看《Google File System》的3.2节
6.数据上传完之后,关流。
从HDFS删除文件的流程
1、客户端向namenode发现 删除文件指令,比如: 代码详解
hadoop fs -rm /park01/1.txt


2、namenode收到请求后,会检查路径的合法性以及权限
3、如果检测通过,会将对应的文件从元数据中删除。(注意,此时这个文件并没有真正从集群上被删除)
4、每台datanode会定期向namenode发送心跳,会领取删除的指令,找到对应的文件块,进行文件块的删除。
HDFS的租约机制
HDFS的有个内部机制:不允许客户端的并行写。指的是同一时刻内,不允许多个客户端向一个HDFS上写数据。
所以要实现以上的机制,实现思路就是用互斥锁,但是如果底层要是用简单的互斥锁,可能有与网络问题,造成客户端不释放锁,而造成死锁。所以Hadoop为了避免这种情况产生,引入租约机制。
租约锁本质上就是一个带有租期的互斥锁。
Hadoop的思想来自于Google的论文,3.1
Hadoop 租约锁对应的类:org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease
还有一个租约锁管理者:
org.apache.hadoop.hdfs.server.namenode.LeaseManager
HDFS特点
1、分布式存储架构,支持海量数据存储。(GB、TB、PB级别数据)
2、高容错性,数据块拥有多个副本(副本冗余机制)。副本丢失后,自动恢复。
3、低成本部署,Hadoop可构建在廉价的服务器上。
4、能够检测和快速应对硬件故障,通过RPC心跳机制来实现。
5、简化的一致性模型,这里指的是用户在使用HDFS时,所有关于文件相关的操作,比如文件切块、块的复制、块的存储等细节并不需要去关注,所有的工作都已被框架封装完毕。用户所需要的做的仅仅是将数据上传到HDFS。这大大简化了分布式文件存储操作的难度和管理的复杂度。
6、HDFS不能做到低延迟的数据访问(毫秒级内给出响应)。但是Hadoop的优势在于它的高吞吐率(吞吐率指的是:单位时间内产生的数据流)。可以说HDFS的设计是牺牲了低延迟的数据访问,而获取的是数据的高吞吐率。如果要想获取低延迟的数据访问,可以通过Hbase框架来实现。
7、HDFS不许修改数据,所以适用场景是:一次写入,多次读取(once-write-many-read)。注意:HDFS允许追加数据,但不允许修改数据。追加和修改的意义是不同的。
8、HDFS不支持并发写入,一个文件同一个时间只能有一个写入者。
9、HDFS不适合存储海量小文件,因为会浪费namenode服务节点的内存空间
大数据
2018-12-20 15:52:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
本文首发于 『阿里云 IoT 开发者社区』 ,更多精彩物联网内容欢迎前往浏览。
智能家居可谓是今年物联网的热门领域,通过智能单品和智能音箱,人们已然把『智能』两个字变成了生活的理所应当。搭建云上之家除了买买买,还能 DIY。依托阿里云物联网平台,我们用 30 行代码来搞定一套智能家居解决方案。
常见的智能家居解决方案包括了设备端、上云、应用端三大部分,更广的还涉及大数据及人工智能。传统的物联网开发非常强调流程性,即设备端、云、应用端三个步骤需要依次进行。而今天,依托于阿里云物联网平台的『物模型』基础,物联网开发的两端可以齐头并进,节省大量的人力物力成本。
齐头并进显然很诱人,但是能否再更进一步,一人 Handle 全部开发呢?答案是 YES!
目前,有大量互联网开发者由于缺乏嵌入式开发能力,如C/C++语言基础,止步于物联网蓝海的大门。通过阿里云 IoT 提供的 TinyEngine 引擎,可以快速使用 Javascript 进行设备端开发,完美解决这部分开发者的心头大患。而针对不熟悉前后端开发的嵌入式开发者,阿里云物联网平台一样提供了『可视化搭建应用』等快速上手的功能,零代码实现应用开发,大大减轻学习负担。
下面我们就使用阿里云物联网开发平台的 TinyEngine 引擎和可视化搭建功能,30 行代码快速开发一个由灯和温湿度计组成的智能家居系统。
一、开通服务
首先,申请阿里云账号,并开通登陆 Link Develop 一站式开发平台:https://linkdevelop.aliyun.com。
之后,新建项目(项目名任意)—— 设备开发 —— 新增产品 —— 所属分类按需选择『灯』或『温湿度计』,通讯方式选择 WiFi ,数据格式选择Alink —— 完成。
完成后选择『设备开发』标签页 —— 新增调试设备,记录下设备三元组。
二、设备开发
打开嵌入式 Javascript 在线工作台(没错,开发环境都不用搭建),创建新项目。替换 index.js 代码:
1. 灯 var deviceShadow = require('deviceShadow'); var ledHandle = GPIO.open("led1"); deviceShadow.bindDevID({ productKey: "", deviceName: "", deviceSecret: "" }); function main(err){ if(err){ console.log("连接平台失败"); }else{ console.log("主程序开始"); deviceShadow.addDevSetPropertyNotify("LightSwitch", function (lightStatus) { GPIO.write(ledHandle, 1-lightStatus); }); var mainLoop = setInterval(function () { var ledStatus = GPIO.read(ledHandle); deviceShadow.postProperty("LightSwitch", 1-ledStatus); }, 2000); } } deviceShadow.start(main);
2. 温湿度计 var deviceShadow = require('deviceShadow'); var shtc1 = require('shtc1'); var handle = new shtc1('shtc1'); var ledHandle = GPIO.open("led"); deviceShadow.bindDevID({ productKey: "a17vi82MmxP", deviceName: "0001", deviceSecret: "tYUngSMqYeDxODgtX3DNKkQ7920I3t4T" }); function main(err) { if (err) { console.log("连接平台失败"); } else { console.log("主程序开始"); var mainLoop = setInterval(function () { var val = handle.getTempHumi(); console.log('shtc1:temp=' + val[0] + ' humi:' + val[1]); deviceShadow.postProperty("CurrentTemperature", val[0]); deviceShadow.postProperty("RelativeHumidity", val[1]); }, 2000); } } deviceShadow.start(main);
将设备连接至电脑,点击『连接』并『运行』,设备启动后会自动加载并运行 index.js 这个文件,同时上报数据至阿里云物联网平台。
三、应用开发
既然是系统,没有应用可不行,我们利用可视化搭建功能 0 代码快速完成一个应用,只需依次拖入仪表盘和开关组件,替换图片,绑定设备即可完成全部操作。
齐活,短短 30 行代码搭建出的端到端智能家居系统就完成了,保存发布后就可以分享给他人访问了。
配合的 TinyEngine 引擎和可视化搭建,开发者无需学习新的编程语言,即可无缝快速切入物联网开发,也彰显了阿里云物联网平台的包容性和独创性。各位开发者,赶紧丢掉犹豫,上手试试吧!https://linkdevelop.aliyun.com
原文链接
大数据
2018-12-11 13:03:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>> Fayson的github: https://github.com/fayson/cdhproject
1.文档编写目的
Gateway节点又称为客户端节点,通常用作访问Hadoop集群的接口机。它主要会部署一些客户端的配置,脚本命令,比如HDFS的core-site.xml,hdfs-site.xml以及hadoop的操作命令。
如果你使用的是Apache Hadoop,你只需要将hadoop相关服务的配置和脚本命令拷贝到客户端机器即可,但一旦集群的配置有所修改,你需要注意也同步到客户端机器。如果是CDH集群,客户端节点也会是Cloudera Manager管理的一台机器,它会被安装cloudera-scm-agent服务,以及CDH的Parcel,部署客户端配置Cloudera Manager会统一做,另外如果客户端机器出现异常,Cloudera Manager也会告警。
增加一台Gateway节点,与安装CDH非常类似,你必须要注意一定要做好客户端机器的前置条件准备,参考《CDH安装前置准备》,否则会增加失败。前面Fayson介绍过在非Kerberos环境下部署Gateway节点,参考《如何给CDH集群增加Gateway节点》。本文则主要是介绍如何在Kerberos环境下给CDH集群增加Gateway节点。
内容概述
1.创建Gateway节点的主机模板
2.Gateway节点的前置准备
3.增加Gateway节点到集群并应用主机模板
4.GateWay节点命令测试
测试环境
1.CDH5.13
2.采用root用户操作
3.CentOS6.5
前置条件
1.CDH5.13集群运行正常
2.Gateway节点已准备,并准备好前置
2.创建Gateway节点的主机模板
1.从Cloudera Manager进入“主机模板”页面
2.点击“创建”
3.给模板命名,点击各个服务勾选相应的GateWay角色
4.点击“创建”,确认创建成功.
3.Gateway节点的前置准备
前置准备请参考Fayson之前的文章《CDH安装前置准备》,主要包括以下步骤:
1.确保OS的yum源可以正常使用,通过yum repolist命令可以查看到匹配的OS的所有包
2.确保Cloudera Manager的yum源运行正常
3.hosts文件配置,需要将Gateway节点的IP和hostname加入到CDH集群节点的hosts文件中,并同步到所有机器包括Gateway节点
4.禁用SELinux
5.关闭防火墙
6.设置swap为10
7.关闭透明大页面
8.配置时钟同步
请务必确保以上操作都已完成,并成功配置,否则接下来的增加节点操作会失败!
4.安装Kerberos客户端
由于集群启用了Kerberos服务,所以需要在Gateway节点安装Kerberos客户端。
1.在Gateway节点上执行如下命令 [ec2-user@ip-172-31-31-212opt]$ sudo yum -y install krb5-libskrb5-workstation
安装成功后查看安装的RPM包 [ec2-user@ip-172-31-31-212opt]$ rpm -qa |grep krb krb5-workstation-1.15.1-8.el7.x86_64 krb5-libs-1.15.1-8.el7.x86_64 krb5-devel-1.15.1-8.el7.x86_64 [ec2-user@ip-172-31-31-212 opt]$
2.将CM集群中的krb5.conf文件拷贝至该Gateway节点 [ec2-user@ip-172-31-22-86 ~]$ scp -i fayson.pem.txt /etc/krb5.conf ip-172-31-31-212:/home/ec2-user/
3.在Gateway节点将krb5.conf文件拷贝至/etc目录下 [ec2-user@ip-172-31-31-212 ~]$ sudo cp krb5.conf /etc/ [ec2-user@ip-172-31-31-212 ~]$ sudo chown root. /etc/krb5.conf [ec2-user@ip-172-31-31-212 ~]$ ll /etc/krb5.conf -rw-r--r-- 1 root root 837 Dec 9 00:28 /etc/krb5.conf [ec2-user@ip-172-31-31-212 ~]$
4.在GateWay节点测试Kerberos客户端是否部署成功 [ec2-user@ip-172-31-31-212 ~]$ kinit -kt fayson.keytab fayson [ec2-user@ip-172-31-31-212 ~]$ klist Ticket cache: FILE:/tmp/krb5cc_1000 Default principal: fayson@CLOUDERA.COM Valid starting Expires Service principal 12/09/2017 00:31:53 12/10/2017 00:31:53 krbtgt/CLOUDERA.COM@CLOUDERA.COM renew until 12/16/2017 00:31:53 [ec2-user@ip-172-31-31-212 ~]$
有如上图所示则表示Kerberos客户端安装成功。
5.增加Gateway节点的集群并应用主机模板
1.进入“所有主机”页面
2.点击“向群集添加主机”
3.选择“经典向导”
4.继续
5.输入Gateway节点的IP或者hostname,点击搜索
6.点击“继续”,选择“自定义存储库”,并输入Cloudera Manager的yum源http地址
7.点击“继续”,勾选Java的两个选项
8.点击“继续”,输入Gateway节点的ec2-user密码
9.点击“继续”,等待cloudera-scm-agent在Gateway节点上安装
安装完成点击“继续”
10.点击“继续”,等待分发Parcel包并激活
完成后,点击“继续”
11.点击“继续”,进行主机检查
12.完成主机检查,点击“继续”,选择主机模板
13.点击“继续”,启动主机上的角色
等待执行成功
14.点击“继续”,部署客户端配置
15.点击完成,查看主机列表GateWay节点的角色信息
至此,给Kerberos环境下CDH集群增加新的Gateway节点完成。
6.Gateway节点测试
1.HDFS命令测试 [ec2-user@ip-172-31-31-212 ~]$ hadoop fs -ls /
2.HBase命令测试 [ec2-user@ip-172-31-31-212 ~]$ hbase shell
3.Hive命令测试 [ec2-user@ip-172-31-31-212 ~]$ hive
4.hadoop命令向集群提交作业 [ec2-user@ip-172-31-31-212 ~]$ hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar pi 5 5
为天地立心,为生民立命,为往圣继绝学,为万世开太平。
推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。
大数据
2018-12-06 00:46:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>> sqoop import --connect jdbc:oracle:thin:@//*.*.*.*:1521/orcl --username * --password * --query " select TEST_REPORT_NO as ROWKEY,a.* from V_ESBHL_BLOOD_TRANSFU_ROD a where a.inspection_date >= '20180901' and a.inspection_date <= '20181201'
大数据
2018-12-04 10:27:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
在本系列 前面的文章 中,简单介绍了一下Ignite的k-最近邻(k-NN)分类算法,下面会尝试另一个机器学习算法,即使用泰坦尼克数据集介绍k-均值聚类算法。正好,Kaggle提供了CSV格式的 数据集 ,而要分析的是两个分类:即乘客是否幸存。
为了将数据转换为Ignite支持的格式,前期需要做一些清理和格式化的工作,CSV文件中包含若干个列,如下: 乘客Id 幸存(0:否,1:是) 船票席别(1:一,2:二,3:三) 乘客姓名 性别 年龄 泰坦尼克号上的兄弟/姐妹数 泰坦尼克号上的父母/子女数 船票号码 票价 客舱号码 登船港口(C=瑟堡,Q=皇后镇,S=南安普顿)
因此首先要做的是,删除任何和特定乘客有关的、和生存无关的列,如下: 乘客Id 乘客姓名 船票号码 客舱号码
接下来会删除任何数据有缺失的行,比如年龄或者登船港口,可以对这些值进行归类,但是为了进行初步的分析,会删除缺失的值。
最后会将部分字段转换为数值类型,比如性别会被转换为: 0:女 1:男
登船港口会被转换为: 0:Q(皇后镇) 1:C(瑟堡) 2:S(南安普顿)
最终的数据集由如下的列组成: 船票席别 性别 年龄 泰坦尼克号上的兄弟/姐妹数 泰坦尼克号上的父母/子女数 票价 登船港口 幸存
可以看到,幸存列已被移到最后。
下一步会将数据拆分为训练数据(80%)和测试数据(20%),和前文一样,还是使用Scikit-learn来执行这个拆分任务。
准备好训练和测试数据后,就可以编写应用了,本文的算法是: 读取训练数据和测试数据; 在Ignite中保存训练数据和测试数据; 使用训练数据拟合k-均值聚类模型; 将模型应用于测试数据; 确定含混矩阵和模型的准确性。
读取训练数据和测试数据
通过下面的代码,可以从CSV文件中读取数据: private static void loadData(String fileName, IgniteCache cache) throws FileNotFoundException { Scanner scanner = new Scanner(new File(fileName)); int cnt = 0; while (scanner.hasNextLine()) { String row = scanner.nextLine(); String[] cells = row.split(","); double[] features = new double[cells.length - 1]; for (int i = 0; i < cells.length - 1; i++) features[i] = Double.valueOf(cells[i]); double survivedClass = Double.valueOf(cells[cells.length - 1]); cache.put(cnt++, new TitanicObservation(features, survivedClass)); } }
该代码简单地一行行的读取数据,然后对于每一行,使用CSV的分隔符拆分出字段,每个字段之后将转换成double类型并且存入Ignite。
将训练数据和测试数据存入Ignite
前面的代码将数据存入Ignite,要使用这个代码,首先要创建Ignite存储,如下: IgniteCache trainData = getCache(ignite, "TITANIC_TRAIN"); IgniteCache testData = getCache(ignite, "TITANIC_TEST"); loadData("src/main/resources/titanic-train.csv", trainData); loadData("src/main/resources/titanic-test.csv", testData);
getCache() 的实现如下: private static IgniteCache getCache(Ignite ignite, String cacheName) { CacheConfiguration cacheConfiguration = new CacheConfiguration<>(); cacheConfiguration.setName(cacheName); cacheConfiguration.setAffinity(new RendezvousAffinityFunction(false, 10)); IgniteCache cache = ignite.createCache(cacheConfiguration); return cache; }
使用训练数据拟合k-NN分类模型
数据存储之后,可以像下面这样创建训练器: KMeansTrainer trainer = new KMeansTrainer() .withK(2) .withDistance(new EuclideanDistance()) .withSeed(123L);
这里k的值配置为2,表示有2个簇(幸存和未幸存),对于距离测量,可以有多个选择,比如欧几里得、海明或曼哈顿,在本例中会使用欧几里得,另外,种子值赋值为123。
然后拟合训练数据,如下: KMeansModel mdl = trainer.fit( ignite, trainData, (k, v) -> v.getFeatures(), // Feature extractor. (k, v) -> v.getSurvivedClass() // Label extractor. );
Ignite将数据保存为键-值(K-V)格式,因此上面的代码使用了值部分,目标值是 Survived 类,特征在其它列中。
将模型应用于测试数据
下一步,就可以用训练好的分类模型测试测试数据了,可以这样做: int amountOfErrors = 0; int totalAmount = 0; int[][] confusionMtx = {{0, 0}, {0, 0}}; try (QueryCursor> cursor = testData.query(new ScanQuery<>())) { for (Cache.Entry testEntry : cursor) { TitanicObservation observation = testEntry.getValue(); double groundTruth = observation.getSurvivedClass(); double prediction = mdl.apply(new DenseLocalOnHeapVector(observation.getFeatures())); totalAmount++; if ((int) groundTruth != (int) prediction) amountOfErrors++; int idx1 = (int) prediction; int idx2 = (int) groundTruth; confusionMtx[idx1][idx2]++; System.out.printf(">>> | %.4f\t | %.0f\t\t\t|\n", prediction, groundTruth); } }
确定含混矩阵和模型的准确性
下面,就可以通过对测试数据中的真实分类和模型进行的分类进行对比,来确认模型的真确性。
代码运行之后,输出如下: >>> Absolute amount of errors 56 >>> Accuracy 0.6084 >>> Precision 0.5865 >>> Recall 0.9873 >>> Confusion matrix is [[78, 55], [1, 9]]
这个初步的结果可不可以改进?可以尝试的是对特征的衡量,在Ignite和Scikit-learn中,可以使用 MinMaxScaler() ,然后会给出如下的输出: >>> Absolute amount of errors 29 >>> Accuracy 0.7972 >>> Precision 0.8205 >>> Recall 0.8101 >>> Confusion matrix is [[64, 14], [15, 50]]
作为进一步分析的一部分,还应该研究幸存与否和年龄和性别之间的关系。
总结
通常来说,k-均值聚类并不适合监督学习任务,但是如果分类很容易,这个方法还是有效的。对于本例来说,关注的就是是否幸存。
大数据
2018-12-03 19:49:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
一、kafka单节点单broker环境搭建
系统环境:centos 64位 mini
虚拟机:Virtual Box
如果你不熟悉virtual box的使用,不知道如何安装centos系统,可以参考之前发布的文章:
快速掌握基于Virtualbox最小化安装centos6及远程工具xshell使用
快速掌握virtualbox虚拟机搭建centos系统集群
如果你不想用virtual box,也可以使用docker,之前发布的文章也有介绍,请进入头条号查看
1、Windows下搭建单节点kafka环境
请参考之前发布在头条号上的文章,已经详细介绍了
1)JDK环境搭建
快速搭建Java(JDK)Tomcat基础开发环境
2)zookeeper环境搭建
Windows下安装zookeeper
3)kafka环境搭建
Windows下安装Kafka及简单使用
2、Linux下搭建
1)JDK环境搭建
下载: http://download.oracle.com/otn-pub/java/jdk/8u181-b13/96a7b8442fe848ef90c96a2fad6ed6d1/jdk-8u181-linux-x64.tar.gz
解压:
cd /usr/local/src tar -zxvf jdk-8u181-linux-x64.tar.gz mv jdk1.8.0_181 ../
配置:
cd /usr/local/jdk1.8.0_181 vim /etc/profile
export JAVA_HOME=/usr/local/jdk1.8.0_181 export PATH=$JAVA_HOME/bin:$PATH
source /etc/profile
验证: java -version
2)zookeeper环境搭建 下载
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz 解压
cd /usr/local/src tar -zxvf zookeeper-3.4.12.tar.gz mv zookeeper-3.4.12 ../ 配置环境变量
cd /usr/local/zookeeper-3.4.12 mkdir data mkdir logs vim /etc/profile
export ZK_HOME=/usr/local/zookeeper-3.4.12 export PATH=$ZK_HOME/bin:$PATH
source /etc/profile 修改zoo.cfg
cp conf/zoo_sample.cfg conf/zoo.cfg vim conf/zoo.cfg dataDir=/usr/local/zookeeper-3.4.12/data dataLogDir=/usr/local/zookeeper-3.4.12/logs 启动测试
zkServer.sh start
[root@jikeh zookeeper-3.4.12]# zkServer.sh start ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper-3.4.12/bin/../conf/zoo.cfg Starting zookeeper ... STARTED 查看启动进程信息
[root@jikeh local]# jps 3829 QuorumPeerMain 3848 Jps ​ [root@jikeh local]# jps -m 3858 Jps -m 3829 QuorumPeerMain /usr/local/zookeeper-3.4.12/bin/../conf/zoo.cfg
注释:zookeeper启动后,会多一个进程QuorumPeerMain
3)kafka环境搭建 下载:
cd /usr/local/src wget https://archive.apache.org/dist/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
注释:我们使用的版本是0.9.0.1 解压:
tar -zxvf kafka_2.11-0.9.0.1.tgz mv kafka_2.11-0.9.0.1 ../ 配置环境变量:
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka_2.11-0.9.0.1 export PATH=$KAFKA_HOME/bin:$PATH
使得环境变量立即生效:source /etc/profile 配置server.properties
cd /usr/local/kafka_2.11-0.9.0.1 vim conf/server.properties
server.properties重要参数说明:
broker.id=0//This must be set to a unique integer for each broker advertised.listeners=PLAINTEXT://192.168.0.108:9092:远程连接需要配置下(0.9.0.1版本没有这个问题) log.dirs=/usr/local/kafka_2.11-0.9.0.1/kafka-logs//默认的是不会持久化存储的,这里必须更改下 zookeeper.connect=//zookeeper的连接地址:根据实际进行配置
3)启动测试
配置你的hosts:
127.0.0.1 jikeh
启动zookeeper:
Kafka 使用 ZooKeeper 如果你还没有ZooKeeper服务器,你需要先启动一个ZooKeeper服务器。 您可以通过与kafka打包在一起的便捷脚本来快速简单地创建一个单节点ZooKeeper实例。
cd /usr/local/kafka_2.11-0.9.0.1 bin/zookeeper-server-start.sh config/zookeeper.properties
启动kafka: bin/kafka-server-start.sh config/server.properties
后台启动: bin/kafka-server-start.sh -daemon config/server-1.properties &
注释:一个server.properties其实,就可以当做一个broker
jps:查看当前运行的进程
jps -m:查看当前运行进程的详细信息
二、kafka基本使用
1、创建Topic
1)新建topic
查看命令的帮助信息:直接输入bin/kafka-topics.sh,然后enter,就可以查看所有的参数信息 bin/kafka-topics.sh --create --zookeeper 192.168.0.108:2181 --replication-factor 1 --partitions 1 --topic test
注释:参数说明 replication-factor:设置副本数 partitions:设置分区数
2)查看当前所有的topic列表 bin/kafka-topics.sh --list --zookeeper 192.168.0.108:2181
3)看下topic的详情信息
查看所有topic: bin/kafka-topics.sh --describe --zookeeper 192.168.0.108:2181
查看指定topic: bin/kafka-topics.sh --describe --zookeeper 192.168.0.108:2181 --topic test
2、发送消息 bin/kafka-console-producer.sh --broker-list 192.168.0.108:9092 --topic test
3、消费消息 bin/kafka-console-consumer.sh --zookeeper 192.168.0.108:2181 --topic test --from-beginning
[root@jikeh kafka_2.11-0.9.0.1]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
上面的警告信息,表示:这是个废弃的用法,建议使用新的用法[bootstrap-server] bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.108:9092 --topic test
注释: --from-beginning 参数的作用:重新开始消费消息,不管是否已经被消费了 对于低版本的kafka需要这样连接: bin/kafka-console-consumer.sh --zookeeper 192.168.0.108:2181 --topic test --from-beginning
4、疑惑点
通过以上命令,很多人就有疑问了,到底什么时候跟zookeeper打交道,什么时候跟broker打交道呢,这很容易让人糊涂
创建topic我是跟zookeeper打交道,而生产、消费消息却与broker打交道,但是低版本的kafka,消费者还是与zookeeper进行打交道的
注意:
低版本的kafka:需要这样消费 bin/kafka-console-consumer.sh --zookeeper 192.168.0.108:2181 --topic test --from-beginning
高版本的kafka: bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.108:9092 --topic test --from-beginning
注释:官网上的文档,其实是按高版本的kafka来更新文档的
为什么会有这种区别呢?
Kafka consumers在早先的版本中,offset默认存储在ZooKeeper中。
三、kafka客户端远程连接测试
如果你理解上有困难,欢迎留言,或者参考视频教程:
带你看官网学习Kafka,掌握Linux下Kafka环境搭建及基本使用
参考 Kafka bootstrap-servers vs zookeeper in kafka-console-consumer 中说建议使用新版(新版本指的是kafka 0.8.0之后的版本)的 --bootstrap-server
大数据
2018-11-26 00:02:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
统计学习三要素
模型

策略



大数据
2018-11-23 22:43:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>> 1、在GitHub上下载pyspark2pmml-master.zip压缩包,下载链接https://github.com/jpmml/pyspark2pmml 2、解压pyspark2pmml-master.zip 3、进入pyspark2pmml-master目录,输入python setup.py build 4、然后再输入python setup.py install 5、进入/app/spark/python/lib,解压 py4j-0.10.4-src.zip和pyspark.zip 6、将解压后的py4j和pyspark复制到/root/anaconda2/lib/python2.7/site-packages中 cp -r /app/spark/python/lib/py4j /root/anaconda2/lib/python2.7/site-packages cp -r /app/spark/python/lib/pyspark /root/anaconda2/lib/python2.7/site-packages 7、使用./pyspark --jars /root/jpmml-sparkml-executable-1.3.9.jar启动pyspark 8、然后执行算法模型代码
大数据
2018-11-20 18:29:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>> import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} import org.apache.spark.sql.Row // Prepare training data from a list of (id, text, label) tuples. val training = (spark.createDataFrame(Seq( (0L, "a b c d e spark", 1.0), (1L, "b d", 0.0), (2L, "spark f g h", 1.0), (3L, "hadoop mapreduce", 0.0), (4L, "b spark who", 1.0), (5L, "g d a y", 0.0), (6L, "spark fly", 1.0), (7L, "was mapreduce", 0.0), (8L, "e spark program", 1.0), (9L, "a e c l", 0.0), (10L, "spark compile", 1.0), (11L, "hadoop software", 0.0) )).toDF("id", "text", "label")) // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. val tokenizer = (new Tokenizer() .setInputCol("text") .setOutputCol("words")) val hashingTF = (new HashingTF() .setInputCol(tokenizer.getOutputCol) .setOutputCol("features")) val lr = (new LogisticRegression() .setMaxIter(10)) val pipeline = (new Pipeline() .setStages(Array(tokenizer, hashingTF, lr))) // We use a ParamGridBuilder to construct a grid of parameters to search over. // With 3 values for hashingTF.numFeatures and 2 values for lr.regParam, // this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from. val paramGrid = (new ParamGridBuilder() .addGrid(hashingTF.numFeatures, Array(10, 100, 1000)) .addGrid(lr.regParam, Array(0.1, 0.01)) .build()) // We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance. // This will allow us to jointly choose parameters for all Pipeline stages. // A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. // Note that the evaluator here is a BinaryClassificationEvaluator and its default metric // is areaUnderROC. val cv = (new CrossValidator() .setEstimator(pipeline) .setEvaluator(new BinaryClassificationEvaluator) .setEstimatorParamMaps(paramGrid) .setNumFolds(3)) // Run cross-validation, and choose the best set of parameters. val cvModel = cv.fit(training) // Prepare test documents, which are unlabeled (id, text) tuples. val test = spark.createDataFrame(Seq( (12L, "spark i j k"), (13L, "l m n"), (14L, "mapreduce spark"), (15L, "apache hadoop") )).toDF("id", "text") // Make predictions on test documents. cvModel uses the best model found (lrModel). (cvModel.transform(test) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => println(s"($id, $text) --> prob=$prob, prediction=$prediction") }) //output /* scala> (cvModel.transform(test) | .select("id", "text", "probability", "prediction") | .collect() | .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => | println(s"($id, $text) --> prob=$prob, prediction=$prediction") | }) (12, spark i j k) --> prob=[0.25806842225846466,0.7419315777415353], prediction=1.0 (13, l m n) --> prob=[0.9185597412653913,0.08144025873460858], prediction=0.0 (14, mapreduce spark) --> prob=[0.43203205663918753,0.5679679433608125], prediction=1.0 (15, apache hadoop) --> prob=[0.6766082856652199,0.32339171433478003], prediction=0.0 */
大数据
2018-11-20 18:28:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
摘要: 2009年,发生了两件看似不起眼的事。初春刚过,阿里云在北京一栋没有暖气的写字楼写下了飞天第一行代码。同年11月11日,淘宝商城启动了一个叫做双11的促销活动。谁也没想到,多年以后他们会是现在这模样。
2009年,发生了两件看似不起眼的事。
初春刚过,阿里云在北京一栋没有暖气的写字楼写下了飞天第一行代码。
同年11月11日,淘宝商城启动了一个叫做双11的促销活动。
谁也没想到,多年以后他们会是现在这模样。
前传
2007年淘宝的交易额突破了400亿,技术团队却喜忧参半:现有集中式架构已经捉襟见肘,该如何应对?
随即启动的分布式改造获得巨大成功,淘宝所有的业务都做了模块化。
尽管这次技术升级在现在看来存在诸多局限性:更多的是为了应对不断增多的图片缓存,在CDN端对海量的图片缓存做了一些限流操作。但就是这次改造成就了历史上第一个双11,2009年双11当天的流量达到了一个高峰,在旧的架构体系下,这几乎是一项不可能完成的任务。
一年后,飞天的第一个版本上线,仅有几十台机器的集群开始服务第一个内部客户——阿里金融。
这些探索让所有人看到了一个趋势:用分布式的方法将传统、昂贵的小型机硬件和软件,替换成通用的X86的服务器集群,虚拟化等技术可以让计算能够按量、随时被启动。
流量狂奔:脉冲计算
从计算机诞生到90年代,计算资源都是作为“可计划性”的资源来使用。无论是探索月球,还是研究基因的奥秘,计算资源本身是可规划的。然而,互联网时代的到来,一个爆发性事件,就有可能挑战整个计算资源,不确定性如影随形。
毫无疑问,双11就是这样一个典型场景。
2011双11的流量洪峰给技术团队留下了午夜惊魂般难忘的回忆:系统性技术问题,使大量商家可能出现超卖。
事后,这群国内互联网行业最优秀的技术人开始意识到这已经不仅仅是一场商业促销活动,更是一次技术大考。
次年,天猫与阿里云、万网宣布联合推出聚石塔平台,率先以云计算为“塔基”,为天猫、淘宝平台上的电商及电商服务商提供IT基础设施。
基于阿里云的ECS云服务器、RDS云数据库、SLB负载均衡网络,商家订单源源不断的推送到商家平台上,保障了数据的稳定和延续。这是云计算首次参与双11,并且实现了191亿的交易总额。
飞天的高速成长也给了所有人信心:5年的时间,飞天平台集群规模从1500台到3000台,2013年8月,单集群超越5000台,同时支持多集群跨机房计算……
接下来几年,云计算逐渐成为这场狂欢节中的基石,到2014年,聚石塔上96%的交易使用了阿里云。
2015年开始,双11构建了全球最大的混合云,将公共云和专有云无缝连接。
而这些成果最终成为了全社会的能力:无论是春节在12306抢票,还是在世界杯期间的狂欢,抑或是微博上的明星八卦,脉冲计算的能力在今天已经变成了常态。
数据平台,计算创造价值
2013年前后,“船票论”在互联网圈极为流行。“船票”成为了人们对于一家公司有没有未来的判断依据。至于如何拿到这张船票,彼时都非常模糊。
然而,移动端带来的海量数据却是真实存在的,移动端带来的新需求,也推动了阿里云大数据平台的发展。
鲜为人知的是,2004年,为了从大数据中分析当前状况,并预测未来的趋势,阿里就拥有了第一个大数据仓库,到2008年,淘宝的业务量与数据量已达到2004年的数千倍,为了让数据成为生产资料,从底层重构大数据技术就显得尤为重要。
2010年春,大数据引擎MaxCompute的前身SQL Engine第一版上线,运行在当时30台机器的飞天集群上。
2011年,飞天团队开始探索支撑集团内部数仓业务,在1500台机器上并行运行云梯2的生产作业,并取得了不输于Hadoop的性能和稳定性成绩。
飞天5K项目之后,MaxCompute随之进入5000台机器和跨机房调度时代,并且可以在377秒完成100TB的排序。
通过MaxCompute引擎助力,从2014年双11开始,千人千面成为常态。现在,这一产品已经成为阿里巴巴的主力计算平台:
l阿里巴巴集团99%的数据存储以及95%的计算能力使用MaxCompute;
l每天有超过500万个作业在平台上运行;
l2017年双11,MaxCompute单日数据处理超过320PB。
几年过去,“船票论”烟消云散,然而大数据平台带来的价值已显山露水,通过大数据与人工智能算法的结合,MaxCompute已成为ET大脑最重要的组成部分,ET城市大脑自动指挥城市路口信号灯,并且成功应用于马来西亚的交通治理;ET工业大脑推动工厂取得更好的良品率,致力于帮助制造企业提升1%的良品率。
从数字化阿里到数字化城市,MaxCompute可以承载EB级的数据存储能力,成为全球首个100TB规模Bigbench测试通过的大数据计算平台;在公共云上覆盖国内外的十几个国家和地区,专有云上部署超过100+套。
史上最大规模的人机协同
2017年的双11被认为是人类历史上最大规模的人机协同“超级工程”:技术运维、商品推荐、客服、支付、物流等各个环节都引入机器智能。
数据、计算力、算法是人工智能的三大核心要素。飞天在通用计算服务的基础上,开始探索面向人工智能的异构计算。
2017年9月12日,阿里云宣布推出全新一代异构计算加速平台,在行业内第一次覆盖了包括AMD、NVIDIA的GPU和Intel、XILINX的FPGA在内的所有6款主流异构实例,提供最高可达75TFLOPS的算力。
全新的基础设施让全球最大规模的人机协同成为了可能。双11大量AI和视频转码业务部署在ECS的GPU集群之上,包括视频云的视频智能处理、阿里小蜜、拍立淘和新零售的智能供应链管理等AI业务都通过阿里云异构计算GPU昊天平台进行加速。
 阿里巴巴数据中心机器人“天巡”每天在机房巡逻,能接替运维人员以往30%的重复性工作。 AI调度官“达灵”将数据中心资源分配率拉升到90%以上。  人工智能助手‘阿里小蜜’在双11当天承担95%的客服咨询。  菜鸟智慧货仓机器人单日可发货超过100万件。  AI设计师“鹿班”,在双11期间设计了4.1亿张商品海报。  阿里机器智能推荐系统双11当天为用户生成超过567亿个专属“货
架”,像智能导购员一样,给消费者“亿人亿面”的个性化推荐。
半年后的武汉云栖大会上,阿里云首次和合作伙伴一起展出了AI智能点单设备,在没有任唤醒词的情况下,客户以每秒5个字的速度,向一台机器点单,并频繁更换语句,这台机器对每次对话均作出了精准应答。
现在,这些基础设施以及商业化的产物正服务于各行各业。
飞天2.0支撑2018全社会的双11
如果说最初的技术变革是为了解决平台的流量峰值问题,那么现在新的篇章已经开启,新技术正在引领商业变革。
今年双11期间,阿里云上新增调用的弹性计算能力累计超过1000万核,相当于10座大型数据中心,创造了“脉冲计算”的新纪录。不仅如此,飞天2.0的新能力全面支撑双11: 阿里云自研神龙弹性裸金属服务器在核心系统中发挥巨大作用。其基于完全自研的新一代软硬融合的X-Dragon虚拟化架构,兼具物理机和虚拟机优势,解决了高峰值流量下的性能瓶颈。  业界首个百万级IOPS的ESSD云盘提供了数十PB的存储规模,应对了史上最大的高并发IO挑战。  猫晚网络直播创下带宽峰值记录,阿里云视频云高可靠直播方案完美保障了优酷2500万用户的体验。  CDN为中国1/3以上的互联网流量提供加速,视频云在海外为Lazada提供直播服务。  国内首次大规模IPv6商业应用实践,云、网、端以及应用全面支持IPv6。  实时计算Blink处理峰值达到每秒17.18亿条,相当于120万本新华字典的数据量。  MaxCompute单日数据处理超过500PB,平稳支撑电商混布单元在线流量洪峰12万笔/秒交易。  云盾为云上客户提供上千万次风险识别服务,并将DDoS高防技术输出到全球,保障全球业务。
……
在这些技术底座之上,整个系统应对流量洪峰更加从容,当天交易额达到创纪录的2135亿。
当然,IoT的登场则为双11创造了更大的想象空间。
在消费侧,IoT技术服务新零售的同时,正在催生新的追踪经济。消费者可以查看进口商品完整的商品溯源实时信息,确保天猫进口商品安全可溯,远洋运输也实时可见。
在制造侧,助力天猫品牌服装厂商数字化接单、下单和备货,个性化生产、柔性化制造,交期准确率近乎100%;对农业生产、运输、销售进行全链路升级。
从线上到线下,从生产制造到物流配送,从国内到海外,阿里云的技术能力延伸到各行各业。而这只是这些新技术应用的冰山一角,云与AI、IoT的有机融合未来还会产生什么化学反应?我们拭目以待。
结语
从27个品牌参加成交额不过数千万元开始,到现在的千亿级体量,双11已经成为一个未来商业实践的范本,也是新技术的最大试验场,这些新技术逐渐变成全社会的基础能力,撬动一场全球的社会大协作。
原文链接
大数据
2018-11-20 14:48:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
1.文档编写目的
MySQL数据库自身提供的主从复制功能可以方便的实现数据的多处自动备份,实现数据库的扩展。多个数据备份不仅可以加强数据的安全性,通过实现读写分离还能进一步提升数据库的负载性能。本文档讲述如何实现MySQL主从复制。注:本文档实现的MySQL主备模式为Active-Passive而不是Active-Active,如果使用双活的方式,建议企业内部配备MySQL的DBA来维护MySQL。CDH集群在运行过程中,MySQL的负载并不会太高,推荐的方式是Active-Passive模式,以降低维护成本和维护难度。 内容概述
1.Master和Slave配置
2.构建主从复制
3.主从复制验证 测试环境
1.两台Linux服务器(172.31.10.118(主)/172.31.5.190),操作系统为CentOS6.5
2.MySQL5.1.73
3.采用root用户操作 前置条件
1.两个MySQL版本必须一致
2.两个MySQL已安装好,且没有任何数据
3.主MySQL必须开启bin-log日志
2.MySQL主从复制
2.1Master和Slave配置
配置文件说明:
log-bin:开启二进制日志,日志文件前缀
server-id:数据库服务的唯一标识确保标识不重复,一般设置为服务器ip的末尾数
binlog-format:设置Mysql binlog记录日志的格式(格式含:Statement、MIXED、ROW),MySQL默认使用的是Statement,推荐使用MIXED。 Master主服务配置(172.31.10.118)
修改/etc/my.conf文件,增加如下配置 [root@ip-172-31-10-118 cloudera-scm-server]# vim /etc/my.cnf [mysqld] datadir=/var/lib/mysql socket=/var/lib/mysql/mysql.sock user=mysql # Disabling symbolic-links is recommended to prevent assorted security risks symbolic-links=0 log-bin=mysql-bin server-id=118 binlog_format=MIXED [mysqld_safe] log-error=/var/log/mysqld.log pid-file=/var/run/mysqld/mysqld.pid
Slave服务配置(172.31.5.190)
修改/etc/my.conf文件,增加如下配置 [root@ip-172-31-5-190 ~]# vim /etc/my.cnf [mysqld] datadir=/var/lib/mysql socket=/var/lib/mysql/mysql.sock user=mysql # Disabling symbolic-links is recommended to prevent assorted security risks symbolic-links=0 log-bin=mysql-bin server-id=190 [mysqld_safe] log-error=/var/log/mysqld.log pid-file=/var/run/mysqld/mysqld.pid
修改配置后重启Master和Slave的MySQL服务。 [root@ip-172-31-5-190 ~]# service mysqld restart Stopping mysqld: [ OK ] Starting mysqld: [ OK ] [root@ip-172-31-5-190 ~]#
2.1构建主从复制 在Master(172.31.10.118) 主MySQL上创建一个mysnc用户
用户名:mysync 密码:mysync GRANT REPLICATION SLAVE ON *.* TO 'mysync'@'172.31.%' IDENTIFIED BY 'mysync'; FLUSH PRIVILEGES;
mysync用户必须具有REPLICATION SLAVE权限。说明一下172.31.%,这个配置是指明mysync用户所在服务器,这里%是通配符,表示IP以172.31开头的Server都可以使用mysync用户登陆Master主服务器。也可以指定固定IP。
命令行操作 [root@ip-172-31-10-118 ~]# mysql -uroot -p Enter password: Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 11 Server version: 5.1.73-log Source distribution Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> GRANT REPLICATION SLAVE ON *.* TO 'mysync'@'172.31.%' IDENTIFIED BY 'mysync'; Query OK, 0 rows affected (0.00 sec) mysql> FLUSH PRIVILEGES; Query OK, 0 rows affected (0.00 sec)
2.查看Master(172.31.10.118) MySQL二进制日志File与Position mysql> show master status; +------------------+----------+--------------+------------------+ | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | +------------------+----------+--------------+------------------+ | mysql-bin.000003 | 1121 | | | +------------------+----------+--------------+------------------+ 1 row in set (0.00 sec) mysql>
3.在Slave从MySQL上执行如下SQL change master to master_host='172.31.10.118', master_user='mysync', master_password='mysync', master_log_file='mysql-bin.000003', master_log_pos=1121;
命令行执行 [root@ip-172-31-5-190 ~]# mysql -uroot -p ... Server version: 5.1.73-log Source distribution ... mysql> change master to -> master_host='172.31.10.118', -> master_user='mysync', -> master_password='mysync', -> master_log_file='mysql-bin.000003', -> master_log_pos=1121; Query OK, 0 rows affected (0.02 sec) mysql>
4.在Slave从MySQL上执行命令,启动同步 mysql> start slave; Query OK, 0 rows affected (0.00 sec) mysql>
5.在Slave MySQL上查看Slave状态 mysql> show slave status \G *************************** 1. row *************************** Slave_IO_State: Waiting for master to send event Master_Host: 172.31.10.118 Master_User: mysync Master_Port: 3306 Connect_Retry: 60 Master_Log_File: mysql-bin.000003 Read_Master_Log_Pos: 1121 Relay_Log_File: mysqld-relay-bin.000002 Relay_Log_Pos: 251 Relay_Master_Log_File: mysql-bin.000003 Slave_IO_Running: Yes Slave_SQL_Running: Yes Replicate_Do_DB: Replicate_Ignore_DB: Replicate_Do_Table: Replicate_Ignore_Table: Replicate_Wild_Do_Table: Replicate_Wild_Ignore_Table: Last_Errno: 0 Last_Error: Skip_Counter: 0 Exec_Master_Log_Pos: 1121 Relay_Log_Space: 407 Until_Condition: None Until_Log_File: Until_Log_Pos: 0 Master_SSL_Allowed: No Master_SSL_CA_File: Master_SSL_CA_Path: Master_SSL_Cert: Master_SSL_Cipher: Master_SSL_Key: Seconds_Behind_Master: 0 Master_SSL_Verify_Server_Cert: No Last_IO_Errno: 0 Last_IO_Error: Last_SQL_Errno: 0 Last_SQL_Error: 1 row in set (0.00 sec) mysql>
注意:上图标注部分显示为“Yes”则表示主从复制构建成功。
2.3主从复制验证 登录Master主MySQL上执行SQL
2.登录Slave从MySQL上执行SQL
3.在Master主MySQL上执行SQL mysql> create database test; Query OK, 1 row affected (0.00 sec) mysql> use test; Database changed mysql> create table table1(id int, name varchar(32)); Query OK, 0 rows affected (0.01 sec) mysql> show databases; +--------------------+ | Database | +--------------------+ | information_schema | | mysql | | test | +--------------------+ 3 rows in set (0.00 sec) mysql> show tables; +----------------+ | Tables_in_test | +----------------+ | table1 | +----------------+ 1 row in set (0.00 sec) mysql>
4.在Slave从MySQL上执行SQL查看 mysql> show databases; +--------------------+ | Database | +--------------------+ | information_schema | | mysql | | test | +--------------------+ 3 rows in set (0.00 sec) mysql> use test; Reading table information for completion of table and column names You can turn off this feature to get a quicker startup with -A Database changed mysql> show tables; +----------------+ | Tables_in_test | +----------------+ | table1 | +----------------+ 1 row in set (0.00 sec) mysql>
通过上述测试,Master主MySQL创建的库和表都正常的同步到Slave从MySQL。
3.备注 如何停止并删除主从同步,在Slave从MySQL上执行如下SQL mysql> stop slave; Query OK, 0 rows affected (0.00 sec) mysql> reset slave; Query OK, 0 rows affected (0.00 sec) mysql>
注意:执行上述操作后,需要重启MySQL服务。 MySQL配置参数说明
参考文档: http://blog.csdn.net/wlzx120/article/details/52301383
推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。
大数据
2018-11-20 13:36:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
什么是分布式系统
回到顶部
  分布式系统是由一组通过网络进行通信、为了完成共同的任务而协调工作的计算机节点组成的系统。分布式系统的出现是为了用廉价的、普通的机器完成单个计算机无法完成的计算、存储任务。其目的是 利用更多的机器,处理更多的数据 。
  首先需要明确的是,只有当单个节点的处理能力无法满足日益增长的计算、存储任务的时候,且硬件的提升(加内存、加磁盘、使用更好的CPU)高昂到得不偿失的时候,应用程序也不能进一步优化的时候,我们才需要考虑分布式系统。因为,分布式系统要解决的问题本身就是和单机系统一样的,而由于分布式系统多节点、通过网络通信的拓扑结构,会引入很多单机系统没有的问题,为了解决这些问题又会引入更多的机制、协议,带来更多的问题。。。
  在很多文章中,主要讲分布式系统分为分布式计算(computation)与分布式存储(storage)。计算与存储是相辅相成的,计算需要数据,要么来自实时数据(流数据),要么来自存储的数据;而计算的结果也是需要存储的。在操作系统中,对计算与存储有非常详尽的讨论,分布式系统只不过将这些理论推广到多个节点罢了。
  那么分布式系统怎么将任务分发到这些计算机节点呢,很简单的思想,分而治之,即分片( partition) 。对于计算,那么就是对计算任务进行切换,每个节点算一些,最终汇总就行了,这就是MapReduce的思想;对于存储,更好理解一下,每个节点存一部分数据就行了。当数据规模变大的时候,Partition是唯一的选择,同时也会带来一些好处:
  (1)提升性能和并发,操作被分发到不同的分片,相互独立
  (2)提升系统的可用性,即使部分分片不能用,其他分片不会受到影响

  理想的情况下,有分片就行了,但事实的情况却不大理想。原因在于,分布式系统中有大量的节点,且通过网络通信。单个节点的故障(进程crash、断电、磁盘损坏)是个小概率事件,但整个系统的故障率会随节点的增加而指数级增加,网络通信也可能出现断网、高延迟的情况。在这种一定会出现的“异常”情况下,分布式系统还是需要继续稳定的对外提供服务,即需要较强的容错性。最简单的办法,就是冗余或者复制集( Replication ),即多个节点负责同一个任务,最为常见的就是分布式存储中,多个节点复杂存储同一份数据,以此增强可用性与可靠性。同时,Replication也会带来性能的提升,比如数据的locality可以减少用户的等待时间。
大数据
2018-11-17 08:56:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
摘要: Hash Clustering通过允许用户在建表时设置表的Shuffle和Sort属性,进而MaxCompute根据数据已有的存储特性,优化执行计划,提高效率,节省资源消耗。 对于Hash Clustering整体带来的性能收益,我们通过标准的TPC-H测试集进行衡量。
背景
在 MaxCompute 查询中,Join是很常见的场景。例如以下Query,就是一个简单的Inner Join把t1表和t2表通过id连接起来:
SELECT t1.a, t2.b FROM t1 JOIN t2 ON t1.id = t2.id;
Join在MaxCompute内部主要有三种实现方法:
Broadcast Hash Join - 当Join存在一个很小的表时,我们会采用这种方式,即把小表广播传递到所有的Join Task Instance上面,然后直接和大表做Hash Join。
Shuffle Hash Join - 如果Join表比较大,我们就不能直接广播了。这时候,我么可以把两个表按照Join Key做Hash Shuffle,由于相同的键值Hash结果也是一样的,这就保证了相同的Key的记录会收集到同一个Join Task Instance上面。然后,每个Instance对数据量小的一路建Hash表,数据量大的顺序读取Join。
Sort Merge Join - 如果Join的表更大一些,#2的方法也用不了,因为内存已经不足以容纳建立一个Hash Table。这时我们的实现方法是,先按照Join Key做Hash Shuffle,然后再按照Join Key做排序,最后我们对Join双方做一个归并,具体流程如下图所示:
实际上对于 MaxCompute 今天的数据量和规模,我们绝大多数情况下都是使用的Sort Merge Join,但这其实是非常昂贵的操作。从上图可以看到,Shuffle的时候需要一次计算,并且中间结果需要落盘,后续Reducer读取的时候,又需要读取和排序的过程。对于M个Mapper和R个Reducer的场景,我们将产生M x R次的IO读取。对应的Fuxi物理执行计划如下所示,需要两个Mapper Stage,一个Join Stage,其中红色部分为Shuffle和Sort操作:
与此同时,我们观察到,有些Join是可能反复发生的,比如上面的Query改成了:
SELECT t1.c, t2.d FROM t1 JOIN t2 ON t1.id = t2.id;
虽然,我们选择的列不一样了,但是底下的Join是完全一样的,整个Shuffle和Sort的过程也是完全一样的。
又或者:
SELECT t1.c, t3.d FROM t1 JOIN t3 ON t1.id = t3.id;
这个时候是t1和t3来Join,但实际上对于t1而言,整个Shuffle和Sort过程还是完全一样。
于是,我们考虑,如果我们初始表数据生成时,按照Hash Shuffle和Sort的方式存储,那么后续查询中将避免对数据的再次Shuffle和Sort。这样做的好处是,虽然建表时付出了一次性的代价,却节省了将来可能产生的反复的Shuffle和Join。这时Join的Fuxi物理执行计划变成了如下所示,不仅节省了Shuffle和Sort的操作,并且查询从3个Stage变成了1个Stage完成:
所以,总结来说,Hash Clustering通过允许用户在建表时设置表的Shuffle和Sort属性,进而 MaxCompute 根据数据已有的存储特性,优化执行计划,提高效率,节省资源消耗。

功能描述
目前Hash Clustering功能已经上线,缺省条件下即打开支持。 创建Hash Clustering Table
用户可以使用以下语句创建Hash Clustering表。用户需要指定Cluster Key(即Hash Key),以及Hash分片(我们称之为Bucket)的数目。Sort是可以选项,但在大多数情况下,建议和Cluster Key一致,以便取得最佳的优化效果。
CREATE TABLE [IF NOT EXISTS] table_name [(col_name data_type [comment col_comment], ...)] [comment table_comment] [PARTITIONED BY (col_name data_type [comment col_comment], ...)]
[CLUSTERED BY (col_name [, col_name, ...]) [SORTED BY (col_name [ASC | DESC] [, col_name [ASC | DESC] ...])] INTO number_of_buckets BUCKETS]
[AS select_statement]
举个例子如下:
CREATE TABLE T1 (a string, b string, c bigint) CLUSTERED BY (c) SORTED by (c) INTO 1024 BUCKETS;
如果是分区表,则可以用这样的语句创建:
CREATE TABLE T1 (a string, b string, c bigint) PARTITIONED BY (dt string) CLUSTERED BY (c) SORTED by (c) INTO 1024 BUCKETS;
CLUSTERED BY
CLUSTERED BY指定Hash Key,MaxCompute将对指定列进行Hash运算,按照Hash值分散到各个Bucket里面。为避免数据倾斜,避免热点,取得较好的并行执行效果,CLUSTERED BY列适宜选择取值范围大,重复键值少的列。此外,为了达到Join优化的目的,也应该考虑选取常用的Join/Aggregation Key,即类似于传统数据库中的主键。
SORTED BY
SORTED BY子句用于指定在Bucket内字段的排序方式,建议Sorted By和Clustered By一致,以取得较好的性能。此外,当SORTED BY子句指定之后,MaxCompute将自动生成索引,并且在查询的时候利用索引来加快执行。
INTO number_of_buckets BUCKETS
INTO ... BUCKETS 指定了哈希桶的数目,这个数字必须提供,但用户应该由数据量大小来决定。Bucket越多并发度越大,Job整体运行时间越短,但同时如果Bucket太多的话,可能导致小文件太多,另外并发度过高也会造成CPU时间的增加。目前推荐设置让每个Bucket数据大小在500MB - 1GB之间,如果是特别大的表,这个数值可以再大点。
目前,MaxCompute只能在Bucket Number完全一致的情况下去掉Shuffle步骤,我们下一个发布,会支持Bucket的对齐,也就是说存在Bucket倍数关系的表,也可以做Shuffle Remove。为了将来可以较好的利用这个功能,我们建议Bucket Number选用2的N次方,如512,1024,2048,最大不超过4096,否则影响性能以及资源使用。
对于Join优化的场景,两个表的Join要去掉Shuffle和Sort步骤,要求哈希桶数目一致。如果按照上述原则计算两个表的哈希桶数不一致,怎么办呢?这时候建议统一使用数字大的Bucket Number,这样可以保证合理的并发度和执行效率。如果表的大小实在是相差太远,那么Bucket Number设置,可以采用倍数关系,比如1024和256,这样将来我们进一步支持哈希桶的自动分裂和合并时,也可以利用数据特性进行优化。 更改表属性
对于分区表,我们支持通过ALTER TABLE语句,来增加或者去除Hash Clustering属性:
ALTER TABLE table_name [CLUSTERED BY (col_name [, col_name, ...]) [SORTED BY (col_name [ASC | DESC] [, col_name [ASC | DESC] ...])] INTO number_of_buckets BUCKETS
ALTER TABLE table_name NOT CLUSTERED;

关于ALTER TABLE,有几点需要注意:
alter table改变聚集属性,只对于分区表有效,非分区表一旦聚集属性建立就无法改变。
alter table只会影响分区表的新建分区(包括insert overwrite生成的),新分区将按新的聚集属性存储,老的数据分区保持不变。
由于alter table只影响新分区,所以该语句不可以再指定PARTITION
ALTER TABLE语句适用于存量表,在增加了新的聚集属性之后,新的分区将做hash cluster存储。 表属性显示验证
在创建Hash Clustering Table之后,可以通过:
DESC EXTENDED table_name;
来查看表属性,Clustering属性将显示在Extended Info里面,如下图所示:
对于分区表,除了可以使用以上命令查看Table属性之后,于是需要通过以下命令查看分区的属性:
DESC EXTENDED table_name partition(pt_spec);
例如:
Hash Clustering的其他优点 Bucket Pruning优化
考虑以下查询:
CREATE TABLE t1 (id bigint, a string, b string) CLUSTERED BY (id) SORTED BY (id) into 1000 BUCKETS;
...
SELECT t1.a, t1.b, t1.c FROM t1 WHERE t1.id=12345;
对于普通表,这个通常意味着全表扫描操作,如果表非常大的情况下,资源消耗量是非常可观的。但是,因为我们已经对id做Hash Shuffle,并且对id做排序,我们的查询可以大大简化:
通过查询值"12345"找到对应的Hash Bucket,这时候我们只需要在1个Bucket里面扫描,而不是全部1000个。我们称之为“Bucket Pruning”。
以下是安全部基于User ID查询场景的一个例子。下面这个logview是普通的表的查询操作,可以看到,由于数据量很大,一共起了1111个Mapper,读取了427亿条记录,最后找符合条件记录26条,总共耗时1分48秒:
同样的数据,同样的查询,用Hash Clustering表来做,我们可以直接定位到单个Bucket,并利用Index只读取包含查询数据的Page,可以看到这里只用了4个Mapper,读取了10000条记录,总共耗时只需要6秒,如果用service mode这个时间还会更短:
Aggregation优化
例如,对于以下查询:
SELECT department, SUM(salary) FROM employee GROUP BY (department);
在通常情况下,我们会对department进行Shuffle和Sort,然后做Stream Aggregate,统计每一个department group。但是如果表数据已经CLUSTERED BY (department) SORTED BY (department),那么这个Shuffle和Sort的操作,也就相应节省掉了。 存储优化
即便我们不考虑以上所述的各种计算上的优化,单单是把表Shuffle并排序存储,都会对于存储空间节省上有很大帮助。因为MaxCompute底层使用列存储,通过排序,键值相同或相近的记录存放到一起,对于压缩,编码都会更加友好,从而使得压缩效率更高。在实际测试中,某些极端情况下,排序存储的表可以比无序表的存储空间节省50%。对于生命周期很长的表,使用Hash Clustering存储,是一个很值得考虑的优化。
以下是一个简单的实验,使用100G TPC-H lineitem表,包含了int,double,string等多种数据类型,在数据和压缩方式等完全一样的情况下,hash clustering的表空间节省了~10%。

测试数据及分析
对于Hash Clustering整体带来的性能收益,我们通过标准的TPC-H测试集进行衡量。测试使用1T数据,统一使用500 Buckets,除了nation和region两个极小的表以外,其余所有表均按照第一个列作为Cluster和Sort Key。
整体测试结果表明, 在使用了Hash Clustering之后,总CPU时间减少17.3%,总的Job运行时间减少12.8%。
具体各个Query CPU时间对比如下:
Job运行时间对比如下:
需要注意到是TPC-H里并不是所有的Query都可以利用到Clustering属性,特别是两个耗时最长的Query没有办法利用上,所以从总体上的效率提升并不是非常惊人。但如果单看可以利用上Clustering属性的Query,收益还是非常明显的,比如Q4快了68%,Q12快了62%,Q10快了47%,等等。
以下是TPC-H Q4在普通表的Fuxi执行计划:
而下面则是使用Hash Clustering之后的执行计划,可以看到,这个DAG被大大的简化,这也是性能得到大幅提升的关键原因:
功能限制及将来计划
目前Hash Clustering的第一阶段开发工作完成,但还存在以下限制和不足: 不支持insert into,只能通过insert overwrite来添加数据。 不支持tunnel直接upload到range cluster表,因为tunnel上传数据是无序的。
原文链接
大数据
2018-11-13 16:41:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
hdfs及MapReduce相关文章 hdfs 文件存储格式 https://my.oschina.net/u/2969788/blog/2875351 hdfs 块(block) 大小的影响 https://my.oschina.net/u/2969788/blog/2873733 mapreduce执行过程概述 https://my.oschina.net/u/2969788/blog/874649 hdfs 原理 https://my.oschina.net/u/2969788/blog/869403 namenode HA高可用: https://my.oschina.net/u/2969788/blog/3060663 hdfs上传下载文件详解: https://my.oschina.net/u/2969788/blog/4289020
大数据
2018-11-13 18:24:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
spark相关文章 spark 重新分区 repartition和coalesce https://my.oschina.net/u/2969788/blog/2875636 spark的宽依赖和窄依赖 https://my.oschina.net/u/2969788/blog/2870446 spark原理和概念 https://my.oschina.net/u/2969788/blog/2396162 spark streaming 学习笔记 https://my.oschina.net/u/2969788/blog/1844481 spark数据倾斜问题 https://my.oschina.net/u/2969788/blog/884586 spark 为什么比MapReduce快一些 https://my.oschina.net/u/2969788/blog/2966883 转载spark调优1 https://blog.csdn.net/qq_36142114/article/details/80489551 转载spark调优2 https://tech.meituan.com/2016/05/12/spark-tuning-pro.html 转载spark调优3 https://blog.csdn.net/lipviolet/article/details/88092563 转载spark基本概念 https://my.oschina.net/u/2969788/blog/3089181 spark ShuffleManager 详解 https://my.oschina.net/u/2969788/blog/4279683 spark job提交过程: https://my.oschina.net/u/2969788/blog/4282117 spark 内存管理: https://my.oschina.net/u/2969788/blog/4289047
参考博客 https://blog.csdn.net/databatman/article/details/53023818#4shuffle-%E5%92%8C-stage
面试题参考: https://zhuanlan.zhihu.com/p/76518708
大数据
2018-11-13 18:22:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
摘要: 近日,阿里云发布了迁移解决方案,旨在为客户提供一站式迁移综合解决方案,将迁移相关的资源统一呈现,帮助客户安全、稳定和高效迁移上云。 解决什么问题? 客户想要上云,需要自己评估整体方案,但往往会有一些问题无法考虑到,导致投入多、效率低、同时也可能造成潜在风险; 而阿里云新上线的迁移解决方案,为客户提供一站式的迁移解决方案,有专业人士进行整体的方案规划、以及具体的工作拆解,并且确保完整的实施落地;帮助客户安全、稳定、高效的迁移上云。
近日,阿里云发布了迁移解决方案,旨在为客户提供一站式迁移综合解决方案,将迁移相关的资源统一呈现,帮助客户安全、稳定和高效迁移上云。
解决什么问题?
客户想要上云,需要自己评估整体方案,但往往会有一些问题无法考虑到,导致投入多、效率低、同时也可能造成潜在风险;
而阿里云新上线的迁移解决方案,为客户提供一站式的迁移解决方案,有专业人士进行整体的方案规划、以及具体的工作拆解,并且确保完整的实施落地;帮助客户安全、稳定、高效的迁移上云。
团队具有专业技术能力,丰富迁移经验,并且深入一线了解客户核心诉求。某新零售客户,迁移过程中,通过数据库重新选型规划、应用和SQL优化工作、深度压测后,红包系统经过阿里云上云改造之后,吞吐量提升40倍。
适用于哪些客户?
适用客户:各个行业的客户都可适用,目前的成功案例多集中在金融、游戏、物联网+、新零售、政企、互联网、房地产等行业;
举例来说,115科技,国内领先的云存储企业,我们进行了互联网史上规模最大的公共云数据迁移,仅用45天;
https://www.aliyun.com/service/customer-case-detail
飞利浦,全球领先的健康科技与电器公司,4月内完成迁移上云应用系统50+个,成本节省54%;
https://www.aliyun.com/service/customer-case-detail
更多行业案例,请参考
https://www.aliyun.com/service/customer-case
适用于那些场景?
迁移类型包括:
线下IDC机房迁移到阿里云,云上业务系统规划与改造,第三方云平台迁移到阿里云,阿里云跨地域迁移,跨可用区迁移等都适用。
场景包括:
系统与应用迁移:满足业务的功能、性能、安全性以及高可用性
实施云上各类型中间件迁移方案,指导客户完成云上缓存、消息和应用等中间件的购买和开通,完成线下中间件到云上中间件的割接;
存储迁移:非结构化数据快速迁云,支持完成全量与增量数据迁移,协助客户验证全量和增量数据迁移的效果,并完成线下存储数据到云上存储数据的割接
数据库迁移:协助客户完成全量与增量数据库迁移,阿里云实施云上数据库迁移方案,指导客户完成云上数据库的购买和开通,提供技术支持协助您,完成全量和增量数据库迁移
大数据迁移:数据上云到MaxCompute,通过数据传输服务,客户可以将自建机房的数据库实时同步到公有云上任一地域的RDS实例里面,即使发生机房损毁的灾难,数据永远在阿里云有一个备份
混合云组网:大型业务系统迁移复杂,需要采用逐步迁移的混合云组网设计,保证在云上和云下链路畅通,协助客户完成专线或虚拟专用网络(Virtual Private Network,简称VPN)的混合云组网,完成混合云组网的网段、路由及安全策略的实施,及业务系统混合云组网的割接。
迁移解决方案有哪些优势?
(1)多年经验沉淀: 帮助互联网、金融、能源、物流等行业客户将复杂业务系统成功迁移上云,与客户共同完成云化过程。
(2)迁移场景丰富: 多年迁移服务经验,支持IDC或其它云厂商迁至阿里云,以及不同版本与可用区迁移、混合云迁移等各种迁移场景。
(3)真正实现量身定制:拥有深厚的最佳迁移技术实践和方法论沉淀。真正做到基于您的系统特征和业务目标,量身设计迁移方案。
(4)专业交付稳定高效: 完善的迁移上云流程和机制,从POC验证、迁移评估分析、迁移过程管控、业务割接方案到项目管理,保障迁移过程安全、稳定和高效。
怎么使用迁移解决方案?
第一步,评估
阿里云迁移专家对应用系统、存储、数据库等进行专业评估,明确迁移收益和投入风险,制定初步业务迁移规划;
第二步,设计
确定合适的迁移策略,结合业务需求设计云上架构,设计并验证典型应用的平滑高效迁移;
第三步,实施
搭建云上基础设施方案:云产品配置、网络接入、安全策略配置等,并批量迁移数据和应用系统;
第四步,优化
借助阿里云的平台和工具来管理应用程序,并根据业务需求和管理数据持续改进,优化架构。
观看发布会直播
https://yq.aliyun.com/live/579
企业应用实现平滑高效地上云迁移
https://promotion.aliyun.com/ntms/act/migration.html
迁移解决方案更多详情点击
https://www.aliyun.com/solution/migration/index
点击聚能聊,畅所欲言
https://yq.aliyun.com/roundtable/432703
原文链接
大数据
2018-11-13 17:30:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
连接(Join)是关系运算,可以用于合并关系(relation)。对于数据库中的表连接操作,可能已经广为人知了。在 MapReduce 中,连接可以用于合并两个或多个数据集。例如,用户基本信息和用户活动详情信息。用户基本信息来自于 OLTP 数据库。用户活动详情信息来自于日志文件。
MapReduce 的连接操作可以用于以下场景: 用户的人口统计信息的聚合操作(例如:青少年和中年人的习惯差异)。 当用户超过一定时间没有使用网站后,发邮件提醒他们。(这个一定时间的阈值是用户自己预定义的) 分析用户的浏览习惯。让系统可以基于这个分析提示用户有哪些网站特性还没有使用到。进而形成一个反馈循环。 所有这些场景都要求将多个数据集连接起来。
最常用的两个连接类型是内连接(inner join)和外连接(outer join)。如下图所示,内连接比较两个关系中所有的元组,判断是否满足连接条件,然后生成一个满足连接条件的结果集。与内连接相反的是,外连接并不需要两个关系的元组都满足连接条件。在连接条件不满足的时候,外连接可以将其中一方的数据保留在结果集中。
为了实现内连接和外连接,MapReduce 中有三种连接策略,如下所示。这三种连接策略有的在 map阶段,有的在 reduce 阶段。它们都针对 MapReduce 的排序-合并的架构进行了优化。 重分区连接(Repartition join)— reduce 端连接。使用场景:连接两 个或多个大型数据集。 复制连接(Replication join)— map 端连接。使用场景:待连接的数 据集中有一个数据集足够 半连接(Semi-join)— 另一个 map 端连接。使用场景:待连接的数据 集中有一个数据集非常
2,选择最佳连接策略
要选择连接数据的最优方法,我们这里使用数据驱动的决策树来选择最佳连接策略。
这个决策树可以总结以下三点: 如果其中有一个数据集小到足够放入到一个 mapper 的内存,则 map only 复制连接最有效。 如果两个数据集都很大,其中一个数据集可通过预过滤(与其它数据集数据不匹配的)元素而大大减少体积,则 semi-join(半连接)最合适。 如果不能对数据进行预处理,并且数据体积太大而不能被缓存—这意味着我们不得不在 reducer端执行 join 连接—需要使用重分区连接(repartition join) 不管应用哪种策略,我们在 join 中应该执行的最基本的活动是使用过滤和投影。
3,过滤和投影
使用过滤和投影减少处理的数据量,使用下推优化技术来改善数据管道。过滤和投影工作原理如下图所示:
应该尽可能地靠近数据源执行过滤和投影;在 MapReduce 中,最好是在 mapper 中执行这个工作。例如下面的代码执行过滤:
投影和谓词下推进一步将过滤推进到存储格式。这甚至更高效,特别是使用基于下推可以 skip 过记录或 blocks 的存储格式时。
下表列出了各种存储格式以及是否支持下推:
大数据
2018-11-01 18:36:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
场景介绍:中国裁判文书网hadoop集群中有一台机器因为磁盘损坏导致机器宕机,该机器上面的所有服务都不可用。在对这台机器重新安装系统和对应的服务后,重新加入Hadoop集群,这是需要slave9上重新添加core然后顶替之前宕掉的salve9对应的shard,但是由于在新的slave9上添加core时,没有备份zookeeper中solr对应clusterstatus.sh文件的内容,导致新的slave9的core对应的shard的ranger的值为null;在添加完新的slave9的core后,在solr admin的界面上测试添加,删除和查询都是没有问题的,但是solrj接口查询数据的时候就出现不能插入的问题,报错:org.apache.solr.common.SolrException: No active slice servicing hash code" when writing Ranger Audits to Solr。后来查询solr的router规则原理后找到了解决方法:
解决步骤如下:
1.使用连接zookeeper的工具ZooInspector,连上zookeeper后
2.找到solr配置文件的clusterstatus.sh配置文件
3.然后找到slave9对应的shard4_0,然后修改ranger和parent参数
ranger的范围赋值:0-7fffffff
parent:shard4
4.然后报存就可以了。

参考:
博客: https://community.hortonworks.com/content/supportkb/155362/errorimplcloudsolrclient-cloudsolrclientjavareques-1.html
solr的路由规则:https://lucidworks.com/2013/06/13/solr-cloud-document-routing/
大数据
2018-10-14 16:55:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>> ###CENTOS 下 编译安装 tesseract-ocr 3.0.4 识别文字 ###参考资料:https://www.cnblogs.com/dajianshi/p/4932882.html https://www.cnblogs.com/cmyxn/p/7007932.html 如果报错"aclocal:not found" 则安装automake即可,yum -y install automake linux 修复aclocal:not found 未找到此命令. https://ftp.gnu.org/gnu/ https://blog.csdn.net/liguangxianbin/article/details/79742642 之后又报错:"git: command not found" 安装yum -y install git 又报错了 error: C++ compiler cannot create executables 安装yum install gcc gcc-c++ gcc-g77 ##https://www.cnblogs.com/ruigu/p/8302375.html 接着又报错了"libtool: Version mismatch error." 运行命令:autoreconf -ivf ##https://blog.csdn.net/dark_gezi/article/details/59055231 """ Configuration is done. You can now build and install tesseract by running: $ make $ sudo make install You can not build training tools because of missing dependency. Check configure output for details. [root@CNSZ22PL2970 tesseract-3.04.00]# make && make install make all-recursive make[1]: Entering directory `/root/tesseract_ocr_pkgs/tesseract-3.04.00' Making all in ccutil make[2]: Entering directory `/root/tesseract_ocr_pkgs/tesseract-3.04.00/ccutil' make[3]: Entering directory `/root/tesseract_ocr_pkgs/tesseract-3.04.00/ccutil' depbase=`echo ambigs.lo | sed 's|[^/]*$|.deps/&|;s|\.lo$||'`;\ /bin/sh ../libtool --tag=CXX --mode=compile g++ -DHAVE_CONFIG_H -I. -I.. -O2 -DNDEBUG -I/usr/local/include/leptonica -DTESSDATA_PREFIX=/usr/local/share/ -std=c++11 -MT ambigs.lo -MD -MP -MF $depbase.Tpo -c -o ambigs.lo ambigs.cpp &&\ mv -f $depbase.Tpo $depbase.Plo libtool: Version mismatch error. This is libtool 2.4.6, but the libtool: definition of this LT_INIT comes from libtool 2.4.2. libtool: You should recreate aclocal.m4 with macros from libtool 2.4.6 libtool: and run autoconf again. make[3]: *** [ambigs.lo] Error 63 make[3]: Leaving directory `/root/tesseract_ocr_pkgs/tesseract-3.04.00/ccutil' make[2]: *** [all-recursive] Error 1 make[2]: Leaving directory `/root/tesseract_ocr_pkgs/tesseract-3.04.00/ccutil' make[1]: *** [all-recursive] Error 1 make[1]: Leaving directory `/root/tesseract_ocr_pkgs/tesseract-3.04.00' make: *** [all] Error 2 """ 运行命令:tesseract 00.jpg text_info -l eng 报以下错误: Tesseract Open Source OCR Engine v3.04.00 with Leptonica Error in pixReadMemPng: function not present Error in pixReadMem: png: no pix returned Error during processing. [root@CNSZ22PL2970 ~]# tesseract -v tesseract 3.04.00 leptonica-1.72 安装识别图片的依赖包: 参考资料:https://blog.csdn.net/joe8910/article/details/84969195 重新运行:tesseract -v [root@CNSZ22PL2970 ~]# tesseract -v tesseract 3.04.00 leptonica-1.72 libjpeg 6b (libjpeg-turbo 1.2.90) : libpng 1.5.13 : zlib 1.2.7 再次运行识别图片的命令 [root@CNSZ22PL2970 ~]# tesseract 00.jpg text_info -l eng Tesseract Open Source OCR Engine v3.04.00 with Leptonica
大数据
2019-04-04 17:21:05
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
如果说收集算法是内存回收的方法论,那么垃圾收集器就是内存回收的具体实现。
Java虚拟机规范中对垃圾收集器应该如何实现并没有任何规定,因此不同的厂商、不同版本的虚拟机所提供的垃圾收集器都可能会有很大差别,并且一般都会提供参数供用户根据自己的应用特点和要求组合出各个年代所使用的收集器。
HotSpot虚拟机的垃圾回收器
图中展示了7种作用于不同分代的收集器,如果两个收集器之间存在连线,就说明它们可以搭配使用。虚拟机所处的区域,则表示它是属于新生代收集器还是老年代收集器。
概念理解 并发和并行
这两个名词都是并发编程中的概念,在谈论垃圾收集器的上下文语境中,它们可以解释如下。 并行(Parallel) :指多条垃圾收集线程并行工作,但此时用户线程仍然处于等待状态。 并发(Concurrent) :指用户线程与垃圾收集线程同时执行(但不一定是并行的,可能会交替执行),用户程序在继续运行,而垃圾收集程序运行于另一个CPU上。 Minor GC 和 Full GC 新生代GC(Minor GC) :指发生在新生代的垃圾收集动作,因为Java对象大多都具备朝生夕灭的特性,所以Minor GC非常频繁,一般回收速度也比较快。 老年代GC(Major GC / Full GC) :指发生在老年代的GC,出现了Major GC,经常会伴随至少一次的Minor GC(但非绝对的,在Parallel Scavenge收集器的收集策略里就有直接进行Major GC的策略选择过程)。Major GC的速度一般会比Minor GC慢10倍以上。 吞吐量
吞吐量就是CPU用于运行用户代码的时间与CPU总消耗时间的比值,即吞吐量 = 运行用户代码时间 /(运行用户代码时间 + 垃圾收集时间)。
虚拟机总共运行了100分钟,其中垃圾收集花掉1分钟,那吞吐量就是99%。
一、Serial收集器
Serial收集器是最基本、发展历史最悠久的收集器,曾经(在JDK 1.3.1之前)是虚拟机新生代收集的唯一选择。
特性:
这个收集器是一个单线程的收集器,但它的“单线程”的意义并不仅仅说明它只会使用一个CPU或一条收集线程去完成垃圾收集工作,更重要的是在它进行垃圾收集时,必须暂停其他所有的工作线程,直到它收集结束。Stop The World 应用场景:
Serial收集器是虚拟机运行在Client模式下的默认新生代收集器。 优势:
简单而高效(与其他收集器的单线程比),对于限定单个CPU的环境来说,Serial收集器由于没有线程交互的开销,专心做垃圾收集自然可以获得最高的单线程收集效率。
二、ParNew收集器
特性:
ParNew收集器其实就是Serial收集器的 多线程版本 ,除了使用多条线程进行垃圾收集之外,其余行为包括Serial收集器可用的所有控制参数、收集算法、Stop The World、对象分配规则、回收策略等都与Serial收集器完全一样,在实现上,这两种收集器也共用了相当多的代码。 应用场景:
ParNew收集器是许多运行在Server模式下的虚拟机中首选的新生代收集器。
很重要的原因是:除了Serial收集器外,目前只有它能与CMS收集器配合工作。
在JDK 1.5时期,HotSpot推出了一款在强交互应用中几乎可认为有划时代意义的垃圾收集器——CMS收集器,这款收集器是HotSpot虚拟机中第一款真正意义上的并发收集器,它第一次实现了让垃圾收集线程与用户线程同时工作。
不幸的是,CMS作为老年代的收集器,却无法与JDK 1.4.0中已经存在的新生代收集器Parallel Scavenge配合工作,所以在JDK 1.5中使用CMS来收集老年代的时候,新生代只能选择ParNew或者Serial收集器中的一个。 Serial收集器 VS ParNew收集器:
ParNew收集器在单CPU的环境中绝对不会有比Serial收集器更好的效果,甚至由于存在线程交互的开销,该收集器在通过超线程技术实现的两个CPU的环境中都不能百分之百地保证可以超越Serial收集器。
然而,随着可以使用的CPU的数量的增加,它对于GC时系统资源的有效利用还是很有好处的。
三、Parallel Scavenge收集器 特性:
Parallel Scavenge收集器是一个 新生代收集器 ,它也是使用 复制算法 的收集器,又是 并行 的多线程收集器。 应用场景:
停顿时间越短就越适合需要与用户交互的程序,良好的响应速度能提升用户体验,而高吞吐量则可以高效率地利用CPU时间,尽快完成程序的运算任务,主要适合在后台运算而不需要太多交互的任务。 对比分析: Parallel Scavenge收集器 VS CMS等收集器:
Parallel Scavenge收集器的特点是它的关注点与其他收集器不同,CMS等收集器的关注点是尽可能地缩短垃圾收集时用户线程的停顿时间,而Parallel Scavenge收集器的目标则是达到一个 可控制的吞吐量 (Throughput)。
由于与吞吐量关系密切,Parallel Scavenge收集器也经常称为“吞吐量优先”收集器。 Parallel Scavenge收集器 VS ParNew收集器:
Parallel Scavenge收集器与ParNew收集器的一个重要区别是它具有自适应调节策略。
GC自适应的调节策略 :
Parallel Scavenge收集器有一个参数-XX:+UseAdaptiveSizePolicy。当这个参数打开之后,就不需要手工指定新生代的大小、Eden与Survivor区的比例、晋升老年代对象年龄等细节参数了,虚拟机会根据当前系统的运行情况收集性能监控信息,动态调整这些参数以提供最合适的停顿时间或者最大的吞吐量,这种调节方式称为GC自适应的调节策略(GC Ergonomics)。
四、Serial Old收集器
特性:
Serial Old是Serial收集器的 老年代版本 ,它同样是一个 单线程收集器 ,使用 标记-整理 算法。 应用场景: Client模式
Serial Old收集器的主要意义也是在于给Client模式下的虚拟机使用。 Server模式
如果在Server模式下,那么它主要还有两大用途:一种用途是在JDK 1.5以及之前的版本中与Parallel Scavenge收集器搭配使用,另一种用途就是作为CMS收集器的后备预案,在并发收集发生Concurrent Mode Failure时使用。
五、 Parallel Old收集器
特性:
Parallel Old是Parallel Scavenge收集器的 老年代版本 ,使用 多线程 和 “标记-整理” 算法。 应用场景:
在注重吞吐量以及CPU资源敏感的场合,都可以优先考虑Parallel Scavenge加Parallel Old收集器。
这个收集器是在JDK 1.6中才开始提供的,在此之前,新生代的Parallel Scavenge收集器一直处于比较尴尬的状态。原因是,如果新生代选择了Parallel Scavenge收集器,老年代除了Serial Old收集器外别无选择(Parallel Scavenge收集器无法与CMS收集器配合工作)。由于老年代Serial Old收集器在服务端应用性能上的“拖累”,使用了Parallel Scavenge收集器也未必能在整体应用上获得吞吐量最大化的效果,由于单线程的老年代收集中无法充分利用服务器多CPU的处理能力,在老年代很大而且硬件比较高级的环境中,这种组合的吞吐量甚至还不一定有ParNew加CMS的组合“给力”。直到Parallel Old收集器出现后,“吞吐量优先”收集器终于有了比较名副其实的应用组合。
六、 CMS收集器 特性:
CMS(Concurrent Mark Sweep)收集器是一种以获取最短回收停顿时间为目标的收集器。目前很大一部分的Java应用集中在互联网站或者B/S系统的服务端上,这类应用尤其重视服务的响应速度,希望系统停顿时间最短,以给用户带来较好的体验。CMS收集器就非常符合这类应用的需求。

CMS收集器是基于 “标记—清除” 算法实现的,它的运作过程相对于前面几种收集器来说更复杂一些,整个过程分为4个步骤: 初始标记(CMS initial mark)
初始标记仅仅只是标记一下GC Roots能直接关联到的对象,速度很快,需要“Stop The World”。 并发标记(CMS concurrent mark)
并发标记阶段就是进行GC Roots Tracing的过程。 重新标记(CMS remark)
重新标记阶段是为了修正并发标记期间因用户程序继续运作而导致标记产生变动的那一部分对象的标记记录,这个阶段的停顿时间一般会比初始标记阶段稍长一些,但远比并发标记的时间短,仍然需要“Stop The World”。 并发清除(CMS concurrent sweep)
并发清除阶段会清除对象。
由于整个过程中耗时最长的并发标记和并发清除过程收集器线程都可以与用户线程一起工作,所以,从总体上来说,CMS收集器的内存回收过程是与用户线程一起并发执行的。 优点:
CMS是一款优秀的收集器,它的主要优点在名字上已经体现出来了: 并发收集 、 低停顿 。 缺点: CMS收集器对CPU资源非常敏感
其实,面向并发设计的程序都对CPU资源比较敏感。在并发阶段,它虽然不会导致用户线程停顿,但是会因为占用了一部分线程(或者说CPU资源)而导致应用程序变慢,总吞吐量会降低。
CMS默认启动的回收线程数是(CPU数量+3)/ 4,也就是当CPU在4个以上时,并发回收时垃圾收集线程不少于25%的CPU资源,并且随着CPU数量的增加而下降。但是当CPU不足4个(譬如2个)时,CMS对用户程序的影响就可能变得很大。 CMS收集器无法处理浮动垃圾
CMS收集器无法处理浮动垃圾,可能出现“Concurrent Mode Failure”失败而导致另一次Full GC的产生。
由于CMS并发清理阶段用户线程还在运行着,伴随程序运行自然就还会有新的垃圾不断产生,这一部分垃圾出现在标记过程之后,CMS无法在当次收集中处理掉它们,只好留待下一次GC时再清理掉。这一部分垃圾就称为“浮动垃圾”。
也是由于在垃圾收集阶段用户线程还需要运行,那也就还需要预留有足够的内存空间给用户线程使用,因此CMS收集器不能像其他收集器那样等到老年代几乎完全被填满了再进行收集,需要预留一部分空间提供并发收集时的程序运作使用。要是CMS运行期间预留的内存无法满足程序需要,就会出现一次“Concurrent Mode Failure”失败,这时虚拟机将启动后备预案:临时启用Serial Old收集器来重新进行老年代的垃圾收集,这样停顿时间就很长了。 CMS收集器会产生大量空间碎片
CMS是一款基于“标记—清除”算法实现的收集器,这意味着收集结束时会有大量空间碎片产生。
空间碎片过多时,将会给大对象分配带来很大麻烦,往往会出现老年代还有很大空间剩余,但是无法找到足够大的连续空间来分配当前对象,不得不提前触发一次Full GC。
七、 G1收集器
特性:
G1(Garbage-First)是一款面向 服务端应用 的垃圾收集器。HotSpot开发团队赋予它的使命是未来可以替换掉JDK 1.5中发布的CMS收集器。与其他GC收集器相比,G1具备如下特点。 并行与并发
G1能充分利用多CPU、多核环境下的硬件优势,使用多个CPU来缩短Stop-The-World停顿的时间,部分其他收集器原本需要停顿Java线程执行的GC动作,G1收集器仍然可以通过并发的方式让Java程序继续执行。 分代收集
与其他收集器一样,分代概念在G1中依然得以保留。虽然G1可以不需要其他收集器配合就能独立管理整个GC堆,但它能够采用不同的方式去处理新创建的对象和已经存活了一段时间、熬过多次GC的旧对象以获取更好的收集效果。 空间整合
与CMS的“标记—清理”算法不同,G1从 整体来看是基于“标记—整理” 算法实现的收集器,从 局部(两个Region之间)上来看是基于“复制” 算法实现的,但无论如何,这两种算法都意味着G1运作期间不会产生内存空间碎片,收集后能提供规整的可用内存。这种特性有利于程序长时间运行,分配大对象时不会因为无法找到连续内存空间而提前触发下一次GC。 可预测的停顿
这是G1相对于CMS的另一大优势,降低停顿时间是G1和CMS共同的关注点,但G1除了追求低停顿外,还能建立可预测的停顿时间模型,能让使用者明确指定在一个长度为M毫秒的时间片段内,消耗在垃圾收集上的时间不得超过N毫秒。
在G1之前的其他收集器进行收集的范围都是整个新生代或者老年代,而G1不再是这样。使用G1收集器时,Java堆的内存布局就与其他收集器有很大差别,它将整个Java堆划分为多个大小相等的独立区域(Region),虽然还保留有新生代和老年代的概念,但新生代和老年代不再是物理隔离的了,它们都是一部分Region(不需要连续)的集合。
G1收集器之所以能建立可预测的停顿时间模型,是因为它可以有计划地避免在整个Java堆中进行全区域的垃圾收集。G1跟踪各个Region里面的垃圾堆积的价值大小(回收所获得的空间大小以及回收所需时间的经验值),在后台维护一个优先列表,每次根据允许的收集时间,优先回收价值最大的Region(这也就是Garbage-First名称的来由)。这种使用Region划分内存空间以及有优先级的区域回收方式,保证了G1收集器在有限的时间内可以获取尽可能高的收集效率。 执行过程:
G1收集器的运作大致可划分为以下几个步骤: 初始标记(Initial Marking)
初始标记阶段仅仅只是标记一下GC Roots能直接关联到的对象,并且修改TAMS(Next Top at Mark Start)的值,让下一阶段用户程序并发运行时,能在正确可用的Region中创建新对象,这阶段需要停顿线程,但耗时很短。 并发标记(Concurrent Marking)
并发标记阶段是从GC Root开始对堆中对象进行可达性分析,找出存活的对象,这阶段耗时较长,但可与用户程序并发执行。 最终标记(Final Marking)
最终标记阶段是为了修正在并发标记期间因用户程序继续运作而导致标记产生变动的那一部分标记记录,虚拟机将这段时间对象变化记录在线程Remembered Set Logs里面,最终标记阶段需要把Remembered Set Logs的数据合并到Remembered Set中,这阶段需要停顿线程,但是可并行执行。 筛选回收(Live Data Counting and Evacuation)
筛选回收阶段首先对各个Region的回收价值和成本进行排序,根据用户所期望的GC停顿时间来制定回收计划,这个阶段其实也可以做到与用户程序一起并发执行,但是因为只回收一部分Region,时间是用户可控制的,而且停顿用户线程将大幅提高收集效率。
八、总结
虽然我们是在对各个收集器进行比较,但并非为了挑选出一个最好的收集器。因为直到现在为止还没有最好的收集器出现,更加没有万能的收集器,所以我们选择的只是对具体应用最合适的收集器。这点不需要多加解释就能证明:如果有一种放之四海皆准、任何场景下都适用的完美收集器存在,那HotSpot虚拟机就没必要实现那么多不同的收集器了。


大数据
2019-03-28 09:08:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
Hive应用:explode和lateral view
一、explode()
这个函数大多数人都接触过,将一行数据转换成列数据,可以用于array和map类型的数据。
用于array的语法如下: select explode(arraycol) as newcol from tablename; explode():函数中的参数传入的是arrary数据类型的列名。 newcol:是给转换成的列命名一个新的名字,用于代表转换之后的列名。 tablename:原表名。
用于map的语法如下: select explode(mapcol) as (keyname,valuename) from tablename; explode():函数中的参数传入的是map数据类型的列名。 由于map是kay-value结构的,所以它在转换的时候会转换成两列,一列是kay转换而成的,一列是value转换而成的。 keyname:表示key转换成的列名称,用于代表key转换之后的列名。 valuename:表示value转换成的列名称,用于代表value转换之后的列名称。
注意:这两个值需要在as之后用括号括起来然后以逗号分隔。
以上为explode()函数的用法,此函数存在局限性: 其一:不能关联原有的表中的其他字段。 其二:不能与group by、cluster by、distribute by、sort by联用。 其三:不能进行UDTF嵌套。 其四:不允许选择其他表达式。
二、lateral view
lateral view是Hive中提供给UDTF的结合,它可以解决UDTF不能添加额外的select列的问题。
lateral view其实就是用来和想类似explode这种UDTF函数联用的,lateral view会将UDTF生成的结果放到一个虚拟表中,然后这个虚拟表会和输入行进行join来达到连接UDTF外的select字段的目的。
格式一 lateral view udtf(expression) tableAlias as columnAlias (,columnAlias)* lateral view在UDTF前使用,表示连接UDTF所分裂的字段。 UDTF(expression):使用的UDTF函数,例如explode()。 tableAlias:表示UDTF函数转换的虚拟表的名称。 columnAlias:表示虚拟表的虚拟字段名称,如果分裂之后有一个列,则写一个即可;如果分裂之后有多个列,按照列的顺序在括号中声明所有虚拟列名,以逗号隔开。
格式二 from basetable (lateral view)* 在from子句中使用,一般和格式一搭配使用,这个格式只是说明了lateral view的使用位置。 from子句后面也可以跟多个lateral view语句,使用空格间隔就可以了。
格式三 from basetable (lateral view outer)*
它比格式二只是多了一个outer,这个outer的作用是在UDTF转换列的时候将其中的空也给展示出来,UDTF默认是忽略输出空的,加上outer之后,会将空也输出,显示为NULL。这个功能是在Hive0.12是开始支持的。
三、案例
下面来说一个需求案例。
1、需求
有一张hive表,分别是学生姓名name(string),学生成绩score(map),成绩列中key是学科名称,value是对应学科分数,请用一个hql求一下每个学生成绩最好的学科及分数、最差的学科及分数、平均分数。
表数据如下: zhangsan|Chinese:80,Math:60,English:90 lisi|Chinese:90,Math:80,English:70 wangwu|Chinese:88,Math:90,English:96 maliu|Chinese:99,Math:65,English:60
2、准备
下面来做一下准备工作,创建表,并将数据导入表中,操作如下:
创建表: create table student_score(name string,score map) row format delimited fields terminated by '|' collection items terminated by ',' map keys terminated by ':';
导入数据: load data local inpath '/home/test/score' overwrite into table student_score;
检查一下数据,如下图:
确认数据导入没有问题。
3、分析
首先要处理这个表中的数据,本人第一想法是想找一下Hive有没有内置的操作map复杂类型的函数,可惜看了一遍,没有找到,这个思路只能放弃。
第二想法,是将map中的数据转换成一个虚拟表,然后与name字段关联,这样形成一张可操作的虚拟表。在查阅了资料之后,看到explode()函数可以做这个事情,首先写了一条语句: select explode(score) from student_score; select explode(score) as (key,value) from student_score;
结果:
此函数验证了它却是可以做到分离map的功能,将行转为列,难么既然行转了列,那么只需要将name字段关联上,就可以进行统计操作了。
可惜的是,explode函数怎么使用,都关联不了name字段。
既然Hive有这些东西,肯定能够做到关联其他字段的,这是本人作为一个程序员的信念,如果没有的话,这个功能做出来就是鸡肋了,只有关联了其他可以确定其为唯一消息的字段,这样的功能才又意义。
又在网上查询到,经常和explode函数和用的就是lateral view函数,那么这两个结合就能做到关联其他字段。写法如下: select name,key,value from student_score lateral view explode(score) scntable as key,value;
结果如下:
看到上面的数据,就是我们想要的结果,产生了这样一个虚拟表之后,所有的工作都变的简单了起来。
从上面两条语句可以看出,explode在select句中和在from子句中给虚拟字段命名的格式稍微有些差别,select句中需要加括号,from子句中不需要括号。
以上是这个需求的难点,其他的就不在做过多的说明。
4、结果
下面将结果抛出来,这可能不是最优的,但是是一种方式: select sname,gk,gv,bk,bv,av from ( select * from ( select C.name as sname,C.key as gk,C.value as gv from ( select name,max(value) as gv from ( select name,key,value from student_score lateral view explode(score) scnTable as key,value) as A group by name) as B left join (select name,key,value from student_score lateral view explode(score) scnTable as key,value) as C on B.name=C.name and B.gv=C.value) as GG left join (select C.name as bname,C.key as bk,C.value as bv from (select name,min(value) as bv from ( select name,key,value from student_score lateral view explode(score) scnTable as key,value) as A group by name) as B left join (select name,key,value from student_score lateral view explode(score) snTable as key,value) as C on B.name=C.name and B.bv=C.value) as BB on GG.sname=BB.bname) as SS left join (select name as aname,avg(value) as av from ( select name,key,value from student_score lateral view explode(score) scnTable as key,value) as A group by name) AA on SS.sname=AA.aname
结果如下:
列名依次为:姓名、最好成绩的科目、分数、最差成绩的科目、分数、平均分
这里需要说一些,Hive中的基本数据类型,string类型应该是使用的自动转换机制,转换为了int,这里将score map声明为score map也是可以的。
四、测试
本人的hive环境为1.1.0CDH5版,此时将上面的数据做一下修改,来测试一下outer的作用,数据如下: zhangsan|Chinese:80,Math:60,English:90 lisi|Chinese:90,Math:80,English:70 wangwu|Chinese:88,Math:90,English:96 maliu|Chinese:99,Math:65,English:
将maliu的英语成绩给去掉,然后导入到表中。
在执行以下语句: select explode(score) from student_score;
结果如下:
可以看到最后一个English的成绩没有显示任何东西,也就是被UDTF给忽略了。
下面使用lateral view看一下: select name,key,value from student_score lateral view explode(score) scntable as key,value;
结果如下:
最后加上outer在试一下: select name,key,value from student_score lateral view outer explode(score) scntable as key,value;
结果如下:
结果和没加outer是一样的,这就又是一个新的问题了,outer是否只对arrary类型的有效,对map类型无效呢?
本人又将arrary类型进行了测试,和map同样,都是什么都不显示,只是将没有的一列做了联表匹配。
这个问题有待研究。
上一篇: Hive语法:union
下一篇: Hive动态分区
大数据
2019-03-27 13:04:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
作者:RickyHuo
本文转载自公众号「大道至简bigdata」
原文链接 : 优秀的数据工程师,怎么用 Spark 在 TiDB 上做 OLAP 分析 TiDB 是一款定位于在线事务处理/在线分析处理的融合型数据库产品,实现了一键水平伸缩,强一致性的多副本数据安全,分布式事务,实时 OLAP 等重要特性。 TiSpark 是 PingCAP 为解决用户复杂 OLAP 需求而推出的产品。它借助 Spark 平台,同时融合 TiKV 分布式集群的优势。直接使用 TiSpark 完成 OLAP 操作需要了解 Spark,还需要一些开发工作。那么,有没有一些开箱即用的工具能帮我们更快速地使用 TiSpark 在 TiDB 上完成 OLAP 分析呢? 目前开源社区上有一款工具 Waterdrop,可以基于 Spark,在 TiSpark 的基础上快速实现 TiDB 数据读取和 OLAP 分析。项目地址: https://github.com/InterestingLab/waterdrop
使用 Waterdrop 操作 TiDB
在我们线上有这么一个需求,从 TiDB 中读取某一天的网站访问数据,统计每个域名以及服务返回状态码的访问次数,最后将统计结果写入 TiDB 另外一个表中。 我们来看看 Waterdrop 是如何实现这么一个功能的。
Waterdrop
Waterdrop 是一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在 Spark 之上。Waterdrop 拥有着非常丰富的插件,支持从 TiDB、Kafka、HDFS、Kudu 中读取数据,进行各种各样的数据处理,然后将结果写入 TiDB、ClickHouse、Elasticsearch 或者 Kafka 中。
准备工作
1. TiDB 表结构介绍
Input(存储访问日志的表) CREATE TABLE access_log ( domain VARCHAR(255), datetime VARCHAR(63), remote_addr VARCHAR(63), http_ver VARCHAR(15), body_bytes_send INT, status INT, request_time FLOAT, url TEXT ) +-----------------+--------------+------+------+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-----------------+--------------+------+------+---------+-------+ | domain | varchar(255) | YES | | NULL | | | datetime | varchar(63) | YES | | NULL | | | remote_addr | varchar(63) | YES | | NULL | | | http_ver | varchar(15) | YES | | NULL | | | body_bytes_send | int(11) | YES | | NULL | | | status | int(11) | YES | | NULL | | | request_time | float | YES | | NULL | | | url | text | YES | | NULL | | +-----------------+--------------+------+------+---------+-------+
Output(存储结果数据的表) CREATE TABLE access_collect ( date VARCHAR(23), domain VARCHAR(63), status INT, hit INT ) +--------+-------------+------+------+---------+-------+ | Field | Type | Null | Key | Default | Extra | +--------+-------------+------+------+---------+-------+ | date | varchar(23) | YES | | NULL | | | domain | varchar(63) | YES | | NULL | | | status | int(11) | YES | | NULL | | | hit | int(11) | YES | | NULL | | +--------+-------------+------+------+---------+-------+
2. 安装 Waterdrop
有了 TiDB 输入和输出表之后, 我们需要安装 Waterdrop,安装十分简单,无需配置系统环境变量 准备 Spark 环境 安装 Waterdrop 配置 Waterdrop
以下是简易步骤,具体安装可以参照 Quick Start。 # 下载安装Spark cd /usr/local wget https://archive.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz tar -xvf https://archive.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz wget # 下载安装Waterdrop https://github.com/InterestingLab/waterdrop/releases/download/v1.2.0/waterdrop-1.2.0.zip unzip waterdrop-1.2.0.zip cd waterdrop-1.2.0 vim config/waterdrop-env.sh # 指定Spark安装路径 SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.1.0-bin-hadoop2.7}
实现 Waterdrop 处理流程
我们仅需要编写一个 Waterdrop 配置文件即可完成数据的读取、处理、写入。
Waterdrop 配置文件由四个部分组成,分别是 Spark 、 Input 、 Filter 和 Output 。 Input 部分用于指定数据的输入源, Filter 部分用于定义各种各样的数据处理、聚合, Output 部分负责将处理之后的数据写入指定的数据库或者消息队列。
整个处理流程为 Input -> Filter -> Output ,整个流程组成了 Waterdrop 的处理流程(Pipeline)。 以下是一个具体配置,此配置来源于线上实际应用,但是为了演示有所简化。
Input (TiDB)
这里部分配置定义输入源,如下是从 TiDB 一张表中读取数据。 input { tidb { database = "nginx" pre_sql = "select * from nginx.access_log" table_name = "spark_nginx_input" } }
Filter
在 Filter 部分,这里我们配置一系列的转化, 大部分数据分析的需求,都是在 Filter 完成的。Waterdrop 提供了丰富的插件,足以满足各种数据分析需求。这里我们通过 SQL 插件完成数据的聚合操作。 filter { sql { table_name = "spark_nginx_log" sql = "select count(*) as hit, domain, status, substring(datetime, 1, 10) as date from spark_nginx_log where substring(datetime, 1, 10)='2019-01-20' group by domain, status, substring(datetime, 1, 10)" } }
Output (TiDB)
最后, 我们将处理后的结果写入 TiDB 另外一张表中。TiDB Output 是通过 JDBC 实现的。 output { tidb { url = "jdbc:mysql://127.0.0.1:4000/nginx?useUnicode=true&characterEncoding=utf8" table = "access_collect" user = "username" password = "password" save_mode = "append" } }
Spark
这一部分是 Spark 的相关配置,主要配置 Spark 执行时所需的资源大小以及其他 Spark 配置。
我们的 TiDB Input 插件是基于 TiSpark 实现的,而 TiSpark 依赖于 TiKV 集群和 Placement Driver (PD)。因此我们需要指定 PD 节点信息以及 TiSpark 相关配置 spark.tispark.pd.addresses 和 spark.sql.extensions 。 spark { spark.app.name = "Waterdrop-tidb" spark.executor.instances = 2 spark.executor.cores = 1 spark.executor.memory = "1g" # Set for TiSpark spark.tispark.pd.addresses = "localhost:2379" spark.sql.extensions = "org.apache.spark.sql.TiExtensions" }
运行 Waterdrop
我们将上述四部分配置组合成我们最终的配置文件 conf/tidb.conf spark { spark.app.name = "Waterdrop-tidb" spark.executor.instances = 2 spark.executor.cores = 1 spark.executor.memory = "1g" # Set for TiSpark spark.tispark.pd.addresses = "localhost:2379" spark.sql.extensions = "org.apache.spark.sql.TiExtensions" } input { tidb { database = "nginx" pre_sql = "select * from nginx.access_log" table_name = "spark_table" } } filter { sql { table_name = "spark_nginx_log" sql = "select count(*) as hit, domain, status, substring(datetime, 1, 10) as date from spark_nginx_log where substring(datetime, 1, 10)='2019-01-20' group by domain, status, substring(datetime, 1, 10)" } } output { tidb { url = "jdbc:mysql://127.0.0.1:4000/nginx?useUnicode=true&characterEncoding=utf8" table = "access_collect" user = "username" password = "password" save_mode = "append" } }
执行命令,指定配置文件,运行 Waterdrop ,即可实现我们的数据处理逻辑。 Local
./bin/start-waterdrop.sh --config config/tidb.conf --deploy-mode client --master 'local[2]' yarn-client
./bin/start-waterdrop.sh --config config/tidb.conf --deploy-mode client --master yarn yarn-cluster
./bin/start-waterdrop.sh --config config/tidb.conf --deploy-mode cluster -master yarn
如果是本机测试验证逻辑,用本地模式(Local)就可以了,一般生产环境下,都是使用 yarn-client 或者 yarn-cluster 模式。
检查结果 mysql> select * from access_collect; +------------+--------+--------+------+ | date | domain | status | hit | +------------+--------+--------+------+ | 2019-01-20 | b.com | 200 | 63 | | 2019-01-20 | a.com | 200 | 85 | +------------+--------+--------+------+ 2 rows in set (0.21 sec)
总结
在这篇文章中,我们介绍了如何使用 Waterdrop 从 TiDB 中读取数据,做简单的数据处理之后写入 TiDB 另外一个表中。仅通过一个配置文件便可快速完成数据的导入,无需编写任何代码。
除了支持 TiDB 数据源之外,Waterdrop 同样支持 Elasticsearch,Kafka,Kudu, ClickHouse 等数据源。
与此同时,我们正在研发一个重要功能,就是在 Waterdrop 中,利用 TiDB 的事务特性,实现从 Kafka 到 TiDB 流式数据处理,并且支持端(Kafka)到端(TiDB)的 Exactly-Once 数据一致性 。
希望了解 Waterdrop 和 TiDB,ClickHouse、Elasticsearch、Kafka 结合使用的更多功能和案例,可以直接进入项目主页: https://github.com/InterestingLab/waterdrop ,或者联系项目负责人: Garyelephan(微信: garyelephant)、RickyHuo (微信: chodomatte1994)。
大数据
2019-02-28 10:22:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
作用 :解析用户传递的搜索信息,抽取出关键字,然后进行过滤,最后去索引库检索,返回用户需要的信息,例如百度的关键字竞价机制
Solr分析器组成:分词器和过滤器 分词器:抽取关键字 过滤器:过滤关键字
Analyzers:分析器
作用:检查域的文本和生成token stream,分析器通过scheme.xml配置文件中的fieldType元素配置,传统的搜索时全文匹配,分析器可以通过分析用户输入的文本字符抽取term(关键字),进行查询然后汇总结果,这样搜索引擎更加智能化
简单案例:schema.xml配置 分析:为solr.TextField的域类型指定分析器
复杂定义分析器: 分析:Field的Text首先被传递给第一个solr.StandardTokenizerFactory,最后一个组件产生terms,词语,用于索引或查询nameText类型的field
注意: org.apache.solr.analysis包里面的类可以直接被引用为solr.为前缀的类 例: solr.EnglishPorterFilterFactory 等价于 org.apache.solr.analysis. EnglishPorterFilterFactory
Analyzer会影响在给定Filed里面索引的term,但是不会影响其实际存储的值
Analysis Phases:分析阶段*
分析发生在2种情况: 索引节点: 建立索引的时候Filed被创建时,语句分析生成的Token Stream被添加到索引并且TokenStream定义了Field的语句集 查询阶段:搜索的值被解析,并且匹配的结果语句集被保存在Field的索引
通常同样的分应用于index/query两个阶段,当进行准确的字符串匹配时效果令人满意,在其他的情况下索引和查询阶段可以轻微不同
多语句扩展分析
一些情况下Analysis处理不是自然语言,像同义词、过滤的停止词Analysis Factory可以在进行这种类型的查询,例如小写处理,分析工厂典型MultiTermAwareComponent,当solr对多语句集的结果执行查询分析事,只有MultiTermAwareComponent能用于查询分析器中,其他的Factory会被忽略
分词器:Tokenizer
作用:分词器主要用于拆分文本流为Tokens,每个Token是文本流的一个子串,分析器直到solr配置的Field,但是Tokenizer不知道,因此分词器只是进行简单的拆分工作
中文分词器ikAnalyzer配置步骤: 引入分词器jar包(ikAnalyzer2012_FF_u1)到lib目录中:solr/web-inf/lib 复制IKAnalyzer的配置文件和自定义词典和停用词典到solr的classpath目录下 配置fieldType指定使用中文分词器:注意analyzer和tokenizer的区分
4.配置Field使用ik分词器的fieldType
效果:
注意:新版本的solr可能和IK分词器不兼容,需修改IK分词器源码
大数据
2019-02-26 23:15:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
列出HDFS文件系统指定目录下文件:hadoop dfs –ls 目录 -d:只显示目录信息,不显示子目录 -h:以M为单位的内存显示 -R:子目录递归显示
HDFS创建目录:hadoop fs -mkdir [-p]
HDFS删除目录:hadoop fs -rmdir [--ignore-fail-on-non-empty] URI [URI ...]
创建空创建空文件:hadoop fs -touchz pathname
清空所有文件信息: hadoop fs -truncate [-w] -w:如果要清空文件正在处理,则必须等待其处理完成之后再清空,这个期间该文件不能再次被打开
上传文件到HDFS: Hadoop dfs -put 本地文件路径 HDFS文件目录
从HDFS下载文件:hadoop dfs –get hdfs文件路径 本地目录
本地文件复制到HDFS:Hadoop dfs [-f] [-p] 本地文件路径 HDFS文件目录
本地目录复制到HDFS: hadoop dfs -copyFromLocal 本地路径 HDFS目录路径
复制HDFS文件到本地:hadoop dfs -copyToLocal hdfs文件 本地目录
本地目录移动到HDFS: hadoop fs -moveFromLocal 先复制后删除本地目录
HDFS目录移动本地:hadoop fs -moveToLocal [-crc] hdfs文件路径 本地目录(还未实现)
HDFS文件系统文件移动:hadoop fs -mv URI [URI ...] hdfs文件路径
注意 :只能在同一个HDFS内部移动,不能通过网络等途径跨文件系统移动
文件数据追加:hadoop fs -appendToFile 本地文件 hdfs文件路径
查询指定目录的子目录信息:hadoop fs -count [-q] [-h] [-v] -q : 指定输出列参数项 -h: 显示文件的大小 -v: 显示标题行 返回:成功 0 失败 -1
HDFS删除文件目录:hadoop fs -rm [-f] [-r |-R] [-skipTrash] URI [URI ...] -f:如果文件不存在不会显示错误等提示信息 -R/-r:等价:递归删除子目录 -skipTrash:清理垃圾文件,对于从超过文件容量的目录中删除文件很有用
查看HDFS文本文件信息:hadoop dfs –cat hdfs文件路径
查询可用HDFS内存空间:hadoop fs -df [-h] URI [URI ...] -h:将内存以M的形式形式
查询目录下文件信息:hadoop fs -du [-s] [-h] URI [URI ...] -h: -h:将内存以M的形式形式 -s : 统计所有子文件信息,统一展示
个性化查询目录信息: hadoop fs -stat [format] ... %F:显示文件的类型:目录/文件 %u:显示文件的拥有者 %g:显示文件的组别 %o:按照块的文件大小 %r:显示文件的复制因子 %y:显示文件的修改日期 %n:显示文件名
查询文件最后1M的信息: hadoop fs -tail [-f] URI -f:输出文件增长的数据:动态显示
测试文件的信息:hadoop fs -test -[defsz] URI -d:判断文件是否是目录类型:0 -e:判断文件是否存在:0 -f:判断文件是否是文件类型: 0 -s:判断文件是否非空: 0 -z:判断文件是0字节 :0
查找文件:hadoop fs -find hdfs文件路径 文件名表达式 例:hdfs dfs -find /user/squirrel -name spark* -print -name:查找文件名的表达式:支持正则表达式 -print:打印文件路径信息
显示HDFS文件访问授权信息:hadoop fs -getfacl [-R] -R:显示目录子文件的权限信息
注意:开启文件ACL访问信息查询权限:dfs.namenode.acls.enabled true
设置文件的访问权限:hadoop fs -setfacl [-R] [-b |-k -m |-x acl_spec path] |[--set ] -b:删除除了基础权限的所有权限 -k:清除默认权限 -R:递归授予子文件权限 -m:修改ACL,添加新的权限信息,原有的权限会保留 -x:清楚指定的ACL实体信息 -set:替换原有的ACL信息,重新授权
例:hadoop fs -setfacl --set user::rw- /file
显示文本文件信息:hadoop dfs -cat 文件路径 或者 hadoop dfs -text 文件路径
设置HDFS文件的备份因子:hadoop fs -setrep [-R] [-w] 备份数 hdfs文件 -W:要求等待备份命令的完成,可能需要消耗很多时间 -R:递归子类文件备份银子设置
合并HDFS的文件到本地:hadoop fs -getmerge [-nl] hdfs文件列表 本地文件路径 -nl:每个文件之间的内容使用空行分开
控制HDFS文件访问权限:dfs -chmod -R 777
HDFS文件路径(避免出现permission denied问题)
hadoop dfsadmin –report: 查看hdfs的状态报告
hadoop安全模式处理:hadoop dfsadmin –safemode enter/leave
大数据
2019-02-24 22:58:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
以黄页88为例,采集当前列表页新闻的正文数据:
第一步:新建任务
①点击左上角“加号”新建任务,如图1:
【图1】
②在弹窗里填写采集地址,任务名称,如图2:
【图2】
③点击下一步,选择进行数据抽取还是链接抽取,本次采集当前列表页联系方式链接,正文数据是通过点击列表链接进入的,所以本次需要抽取列表链接,所以点击抽取链接,翻页选择普通翻页,如图3:
【图3】
④完成后模板抽取配置列表有两个模板,默认模板和链接列表。默认模板下自动生成两个链接抽取,一个为链接列表抽取,已与模板“链接列表”关联,一个为普通翻页链接,此链接已与默认模板自身关联,如图4。
【图4】
第二步:通过地址过滤,得到所需的联系方式链接。
①点击采集预览,在采集预览中有于目标链接相似的其他链接,可通过地址过滤得到联系方式链接。找到所需要的链接,右击复制链接,如图5所示。
【图5】
②勾选地址过滤,过滤规则选择包含,将复制的目标地址粘入,使用公共部分“company_contact.html”进行地址过滤,得到所需链接,如图6所示。
【图6】
③点击采集预览确认链接是否过滤完全,如图7
【图7】
第三步:过滤翻页链接
①在采集预览中选择普通翻页,在采集预览中有于目标链接相似的其他链接,可通过地址过滤得到列表链接。找到所需要的列表链接,观察得出所需要的目标链接都包含“pn+数字”。使用过滤串“\d”,右击复制链接,如图8所示。 过滤串规则说明:\d 表示一串(个)数字
【图8】
②勾选地址过滤,过滤规则选择包含,填入“pn\d”,得到列表链接,如图9所示。
【图9】
③右击模板一,选择模板预览,采集预览确认链接是否过滤完全,如图10:
【图10】
第四步:填写模板二示例地址并新建数据抽取
①将模板一过滤得到的任意一条链接,作为模板二的示例地址, 见图11:
【图11】
②新建数据抽取
方法一:通过点击“下一步”后勾选抽取数据,再次点击“下一步”得到数据抽取。
方法二:直接点击模板二,点击上面“新建数据抽取”按钮,得到数据抽取,如图12
【图12】
第五步:创建/选择表单
在ForeSpider爬虫中,表单是可以复用的,所以可以在数据表单出直接选择之前建过的表单,也可以通过表单ID来进行查找并关联数据表单。此处使用的方法三。
方法一:通过下拉菜单或表单ID选择已有表单
方法二:点击创建表单进入快速建表页面,新建表单。
方法三:点击“采集配置”-“数据建表”,点击采“采集表单”后面的添加按钮,如图13:
【图13】
第六步:配置表单
根据所需内容,配置表单字段(即表头),此处配置了网页主键、联系人、联系方式、公司名称、公司网址链接共5个字段,表单如图14:
【图14】
第七步:字段取值
①在数据抽取部分导入表单:黄页
【图15】
②Per_name字段。如图16所示,绿框选中联系人,确认选区,然后选择脚本处理,取值“:”之后的姓名。
选区取值方法:按住Ctrl+鼠标左键,进行区域选择,按住Shift+鼠标左键,扩大选择区域。
代码:return VALUE.Right(":");
代码如图17所示。
【图16】
【图17】
③Com_name 字段。同per_name字段,绿框选中公司名称,确认选区,然后选择脚本处理,取值“:”之后的名称。
选区取值方法:按住Ctrl+鼠标左键,进行区域选择,按住Shift+鼠标左键,扩大选择区域。
代码:return VALUE.Right(":");
④Tel 字段。如图18所示,因为在模板预览中链接标题就是手机号,所以采用了高级取值---脚本取值。
【图18】
直接在脚本处写上“return URL.title;” 即链接标题名为手机号,如图19所示。
代码:return URL.title;
【图19】
⑤Link字段。采集内容选择网页信息>网页地址,保存即可,如图20所示。
【图20】
第八步:模板预览
①鼠标右键点击“数据抽取”,然后点击“模板预览”,如图21。
【图21】
②预览结果,如图22所示。
【图22】
第九步:采集预览
①点击右上角采集预览,如图23:
【图23】
②双击任意一条链接,看看是否可以得到和网页对应的规整的数据,如图24所示。
【图24】
大数据
2019-01-23 18:54:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
采集字段:
用户、图片1、图片2 、图片3、 评论内容、 颜色及尺码、 最新评论时间
采集工具:
后羿采集软件
采集结果预览:
下面我们来详细介绍一下如何使用流程图模式,采集淘宝商品的评论,我们以淘宝店铺毛菇小象家的毛呢短裤的评论为例,具体步骤如下:
步骤一:新建采集任务
1、复制 淘宝商品评论 的网页地址(需要搜索结果页的网址,而不是首页的网址)
2、新建流程图模式采集任务
步骤二:配置采集规则
1、点击评论
输入网址后,我们进入淘宝宝贝的详情页,此时页面上出现的登录界面我们可以点击关闭,在详情页上可以看到评论数但是看不到具体的评论内容,我们需要点击评论,然后在左上角跳转出来的提示框中选择“点击该元素”。

2、设置提取字段数据
跳转到评论界面后,我们点击网页上的评论字段,在左上角的操作提示框内选择提取全部元素。

提取出评论列表页上的字段之后,我们可以右击字段进行相关设置,包括修改字段名称、增减字段、处理数据等。
点此 了解更多关于提取字段组件的内容。
我们需要采集评论内容、用户名称、评论发布时间及评论图片等信息,字段设置效果如下:
3、设置下一页
我们采集出了单页的评论数据,现在需要采集下一页的数据,我们点击页面上的“下一页”按钮,在左上角出现的操作提示框内选择“循环点击下一页”。
点此 了解更多关于翻页的内容。

步骤四:设置并启动采集任务
点击“保存并启动”按钮,可在弹出的页面中进行一些高级设置,包括定时启动、自动入库和下载图片,本次示例中未使用到这些功能,直接点击“启动”运行爬虫工具。
步骤五:导出并查看数据
数据采集完成后,我们可以查看和导出数据,软件支持多种导出方式(手动导出到本地、手动导出到数据库、自动发布到数据库、自动发布到网站)和导出文件的格式(EXCEL、CSV、HTML和TXT),我们选择自己需要方式和文件类型,点击“确认导出”。
再为您推荐几个关于电商的采集教程:
如何免费采集蘑菇街商品销售数据
如何免费采集亚马逊商品信息数据
如何免费采集当当网商品信息数据
大数据
2019-01-15 13:31:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
jion分类
当前SparkSQL支持三种Join算法-shuffle hash join、broadcast hash join以及sort merge join。其中前两者归根到底都属于hash join,只不过在hash join之前需要先shuffle还是先broadcast。
选择思路大概是:大表与小表进行join会使用broadcast hash join,一旦小表稍微大点不再适合广播分发就会选择shuffle hash join,最后,两张大表的话无疑选择sort merge join。
Hash Jion
将小表转换成Hash Table,用大表进行遍历,对每个元素去Hash Table里查看是否存在,存在则进行jion。
先来看看这样一条SQL语句: select * from order,item where item.id = order.i_id ,很简单一个Join节点,参与join的两张表是item和order,join key分别是item.id以及order.i_id。现在假设这个Join采用的是hash join算法,整个过程会经历三步: 确定Build Table以及Probe Table:这个概念比较重要,Build Table使用join key构建Hash Table,而Probe Table使用join key进行探测,探测成功就可以join在一起。通常情况下,小表会作为Build Table,大表作为Probe Table。此事例中item为Build Table,order为Probe Table。 构建Hash Table:依次读取Build Table(item)的数据,对于每一行数据根据join key(item.id)进行hash,hash到对应的Bucket,生成hash table中的一条记录。数据缓存在内存中,如果内存放不下需要dump到外存。 探测(Probe):再依次扫描Probe Table(order)的数据,使用相同的hash函数映射Hash Table中的记录,映射成功之后再检查join条件(item.id = order.i_id),如果匹配成功就可以将两者join在一起。
基本流程可以参考上图,这里有两个小问题需要关注: hash join性能如何?很显然,hash join基本都只扫描两表一次,可以认为o(a+b) 为什么Build Table选择小表?道理很简单,因为构建的Hash Table最好能全部加载在内存,效率最高;这也决定了hash join算法只适合至少一个小表的join场景,对于两个大表的join场景并不适用;
上文说过,hash join是传统数据库中的单机join算法,在分布式环境下需要经过一定的分布式改造,说到底就是尽可能利用分布式计算资源进行并行化计算,提高总体效率。hash join分布式改造一般有两种经典方案: broadcast hash join:将其中一张小表广播分发到另一张大表所在的分区节点上,分别并发地与其上的分区记录进行hash join。broadcast适用于小表很小,可以直接广播的场景。 shuffler hash join:一旦小表数据量较大,此时就不再适合进行广播分发。这种情况下,可以根据join key相同必然分区相同的原理,将两张表分别按照join key进行重新组织分区,这样就可以将join分而治之,划分为很多小join,充分利用集群资源并行化。
Broadcast Hash Join
broadcast hash join可以分为两步: broadcast阶段:将小表广播分发到大表所在的所有主机。广播算法可以有很多,最简单的是先发给driver,driver再统一分发给所有executor;要不就是基于bittorrete的p2p思路; hash join阶段:在每个executor上执行单机版hash join,小表映射,大表试探;
SparkSQL规定broadcast hash join执行的基本条件为被广播小表必须小于参数 spark.sql.autoBroadcastJoinThreshold ,默认为10M。
BroadcastNestedLoopJoin
cross jion 在进行笛卡尔集运算时使用了嵌套云环的jion方式,也就是我们常用的两个for循环嵌套进行判断。
Shuffle Hash Join
在大数据条件下如果一张表很小,执行join操作最优的选择无疑是broadcast hash join,效率最高。但是一旦小表数据量增大,广播所需内存、带宽等资源必然就会太大,broadcast hash join就不再是最优方案。此时可以按照join key进行分区,根据key相同必然分区相同的原理,就可以将大表join分而治之,划分为很多小表的join,充分利用集群资源并行化。如下图所示,shuffle hash join也可以分为两步: shuffle阶段:分别将两个表按照join key进行分区,将相同join key的记录重分布到同一节点,两张表的数据会被重分布到集群中所有节点。这个过程称为shuffle hash join阶段:每个分区节点上的数据单独执行单机hash join算法。
Sort-Merge Join shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理 sort阶段:对单个分区节点的两表数据,分别进行排序 merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出( 相比hash jion解决了大表不能全部hash到内存中的问题 )
仔细分析的话会发现,sort-merge join的代价并不比shuffle hash join小,反而是多了很多。那为什么SparkSQL还会在两张大表的场景下选择使用sort-merge join算法呢?这和Spark的shuffle实现有关, 目前spark的shuffle实现都适用sort-based shuffle算法,因此在经过shuffle之后partition数据都是按照key排序的。因此理论上可以认为数据经过shuffle之后是不需要sort的,可以直接merge 。
Hash Jion优化
hash jion中使用了谓词 布隆过滤器 进行下推实现了 FR(Runtime Filter) , 对jion操作进一步做了优化。
谓词下推 其一是逻辑执行计划优化层面的说法,比如SQL语句:select * from order ,item where item.id = order.item_id and item.category = ‘book’,正常情况语法解析之后应该是先执行Join操作,再执行Filter操作。通过谓词下推,可以将Filter操作下推到Join操作之前执行。即将where item.category = ‘book’下推到 item.id = order.item_id之前先行执行。 其二是真正实现层面的说法,谓词下推是将过滤条件从计算进程下推到存储进程先行执行(存储与计算分离的场景)减少IO,网络,内存等开销。例如将filter(bloomfilter)操作从excutor(计算进程)中转移到datanode(存储进程)中完成。
存在问题
由于spark CBO 分析的不准确问题导致broadcastjoin 经常出现乱广播的情形。
参考
http://hbasefly.com/2017/03/19/sparksql-basic-join/
http://hbasefly.com/2017/04/10/bigdata-join-2/
大数据
2019-01-14 11:58:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
[TOC]
一、概述
随着业务发展和数据量的增加,大数据应用开发已成为部门应用开发常用的开发方式,由于部门业务特点的关系,spark和hive应用开发在部门内部较为常见。当处理的数据量达到一定量级和系统的复杂度上升时,数据的唯一性、完整性、一致性等等校验就开始受到关注,而通常做法是根据业务特点,额外开发job如报表或者检查任务,这样会比较费时费力。
目前遇到的表大部分在几亿到几十亿的数据量之间,并且报表数量在不断增加,在这种情况下,一个可配置、可视化、可监控的数据质量工具就显得尤为重要了。Griffin 数据质量监控工具正是可以解决前面描述的数据质量问题的开源解决方案。
二、Apache Griffin
Griffin起源于eBay中国,并于2016年12月进入Apache孵化器,Apache软件基金会2018年12月12日正式宣布Apache Griffin毕业成为Apache顶级项目。
Griffin是属于模型驱动的方案,基于目标数据集合或者源数据集(基准数据),用户可以选择不同的数据质量维度来执行目标数据质量的验证。支持两种类型的数据源:batch数据和streaming数据。对于batch数据,我们可以通过数据连接器从Hadoop平台收集数据。对于streaming数据,我们可以连接到诸如Kafka之类的消息系统来做近似实时数据分析。在拿到数据之后,模型引擎将在spark集群中计算数据质量。
2.1 特性 度量:精确度、完整性、及时性、唯一性、有效性、一致性。 异常监测:利用预先设定的规则,检测出不符合预期的数据,提供不符合规则数据的下载。 异常告警:通过邮件或门户报告数据质量问题。 可视化监测:利用控制面板来展现数据质量的状态。 实时性:可以实时进行数据质量检测,能够及时发现问题。 可扩展性:可用于多个数据系统仓库的数据校验。 可伸缩性:工作在大数据量的环境中,目前运行的数据量约1.2PB(eBay环境)。 自助服务:Griffin提供了一个简洁易用的用户界面,可以管理数据资产和数据质量规则;同时用户可以通过控制面板查看数据质量结果和自定义显示内容。
2.1.1 数据质量指标说明 精确度:度量数据是否与指定的目标值匹配,如金额的校验,校验成功的记录与总记录数的比值。 完整性:度量数据是否缺失,包括记录数缺失、字段缺失,属性缺失。 及时性:度量数据达到指定目标的时效性。 唯一性:度量数据记录是否重复,属性是否重复;常见为度量为hive表主键值是否重复。 有效性:度量数据是否符合约定的类型、格式和数据范围等规则。 一致性:度量数据是否符合业务逻辑,针对记录间的逻辑的校验,如:pv一定是大于uv的,订单金额加上各种优惠之后的价格一定是大于等于0的。
2.2 优势 可配置、可自定义的数据质量验证。 基于spark的数据分析,可以快速计算数据校验结果。 历史数据质量趋势可视化。
2.3 工作流程 注册数据,把想要检测数据质量的数据源注册到griffin。 配置度量模型,可以从数据质量维度来定义模型,如:精确度、完整性、及时性、唯一性等。 配置定时任务提交spark集群,定时检查数据。 在门户界面上查看指标,分析数据质量校验结果。
2.4 系统架构
Griffin 系统主要分为:数据收集处理层(Data Collection&Processing Layer)、后端服务层(Backend Service Layer)和用户界面(User Interface),如图:
系统数据处理分层结构图:
系统处理流程图:
2.5 数据验证逻辑
2.5.1 精确度验证(accurancy),从hive metadata中加载数据源,校验精确度 选择source表及列 选择target表及列 选择字段比较规则(大于、小于或者相等) 通过一个公式计算出结果:
最后在控制面板查看精确度趋势
2.5.2 数据统计分析(profiling) 选择需要进行分析的数据源,配置字段等信息。 简单的数据统计:用来统计表的特定列里面值为空、唯一或是重复的数量。例如统计字段值空值记录数超过指定一点阈值,则可能存在数据丢失的情况。 汇总统计:用来统计最大值、最小值、平均数、中值等。例如统计年龄列的最大值最小值判断是否存在数据异常。 高级统计:用正则表达式来对数据的频率和模式进行分析。例如邮箱字段的格式验证,指定规则的数据验证。 数据分析机制主要是基于Spark的MLlib提供的列汇总统计功能,它对所有列的类型统计只计算一次。 控制面板分析数据
2.5.3 异常检测 异常检测的目标是从看似正常的数据中发现异常情况,是一个检测数据质量问题的重要工具。通过使用BollingerBands和MAD算法来实现异常检测功能,可以发现数据集中那些远远不符合预期的数据。 以MAD作为例子,一个数据集的MAD值反映的是每个数据点与均值之间的距离。可以通过以下步骤来得到MAD值: 算出平均值 算出每一个数据点与均值的差 对差值取绝对值 算出这些差值取绝对值之后的平均值
公式如下:
通过异常检测可以发现数据值的波动大小是否符合预期,数据的预期值则是在对历史趋势的分析中得来的,用户可以根据检测到的异常来调整算法中必要的参数,让异常检测更贴近需求。
2.6 Demo
以检测供应商账单明细表的同步精确度为例,配置数据检测,如图: 选择数据源
选择账单明细源表字段
选择账单明细目标表字段
设置源表和目标表的校验字段映射关系
选择数据分区、条件和是否输出结果文件。(无分区表可以跳过)
设置验证项目名称和描述,提交后就可以在列表看到度量的信息了

创建了数据模型度量后,需要相应的spark定时任务来执行分析,接下来就是创建spark job和调度信息了 在job菜单下,选择Create Job
创建job界面中需要选择源表和目标表数据范围,如上图所示是选择t-1到当前的数据分区,即昨天的数据分区。设置定时表达式,提交任务后即可在job列表中查看:
到这里,数据验证度量和分析任务都已配置完成,后面还可根据你的指标设置邮件告警等监控信息,接下来就可以在控制面板上监控你的数据质量了,如图:
2.7 后台提交监控任务
除了用户在控制面板创建数据验证任务,也可以通过后台生成指标信息,提交spark任务进行数据检测,提供了良好的数据接入和配置的扩展性,api配置数据检测可查看官网 快速指引 。
实时数据检测目前未有界面配置,可以通过api的方式提交实时数据监控,详细内容可以参考: Streaming Use Cases 。
赖泽坤@vipshop.com
参考文档 Griffin – 模型驱动的数据质量服务平台 开源数据质量解决方案Griffin-介绍篇 Apache Griffin User Guide 公司内部数据质量平台DQC Griffin Workflow
大数据
2019-01-07 23:25:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
Hive是基于Hadoop的数据仓库工具,可对存储在HDFS上的文件中的数据进行数据整理、特殊查询和分析处理,提供了类似于SQL语言的查询语言-HiveSQL,可通过HQL语句实现简单的MR统计,Hive将HQL语句转换成MR任务进行执行。

1-1 数据仓库概念
数据仓库(Data Warehouse)是一个面向主题的(Subject Oriented)、集成的(Integrated)、相对稳定的(Non-Volatile)、反应历史变化(Time Variant)的数据集合,用于支持管理决策。
数据仓库体系结构通常含四个层次 :数据源、数据存储和管理、数据服务、数据应用。
数据源:是数据仓库的数据来源,含外部数据、现有业务系统和文档资料等 ;数据集成:完成数据的抽取、清洗、转换和加载任务,数据源中的数据采用ETL(Extract-Transform-Load)工具以固定的周期加载到数据仓库中。 数据存储和管理:此层次主要涉及对数据的存储和管理,含数据仓库、数据集市、数据仓库检测、运行与维护工具和元数据管理等 。 数据服务:为前端和应用提供数据服务, 可直接从数据仓库中获取数据供前端应用使用,也可通过OLAP(OnLine Analytical Processing,联机分析处理)服务器为前端应用提供负责的数据服务。 数据应用:此层次直接面向用户,含数据查询工具、自由报表工具、数据分析工具、数据挖掘工具和各类应用系统。

1-2 传统数据仓库的问题
1)无法满足快速增长的海量数据存储需求,传统数据仓库基于关系型数据库,横向扩展性较差,纵向扩展有限。
2)无法处理不同类型的数据,传统数据仓库只能存储结构化数据,企业业务发展,数据源的格式越来越丰富。
3)传统数据仓库建立在关系型数据仓库之上,计算和处理能力不足,当数据量达到TB级后基本无法获得好的性能。

1-3 Hive
Hive是建立在Hadoop之上的数据仓库,由Facebook开发, 在某种程度上可以看成是用户编程接口,本身并不存储和处理数据,依赖于HDFS存储数据,依赖MR处理数据。有类SQL语言HiveQL,不完全支持SQL标准,如,不支持更新操作、索引和事务,其子查询和连接操作也存在很多限制。
Hive把HQL语句转换成MR任务后,采用批处理的方式对海量数据进行处理。数据仓库存储的是静态数据,很适合采用MR进行批处理。Hive还提供了一系列对数据进行提取、转换、加载的工具,可以存储、查询和分析存储在HDFS上的数据。

1-4 Hive与Hadoop生态系统中其他组件的关系
Hive依赖于HDFS存储数据,依赖MR处理数据 ;Pig可作为Hive的替代工具,是一种数据流语言和运行环境,适合用于在Hadoop平台上查询半结构化数据集,用于与ETL过程的一部分,即将外部数据装载到Hadoop集群中,转换为用户需要的数据格式;HBase是一个面向列的、分布式可伸缩的数据库,可提供数据的实时访问功能,而Hive只能处理静态数据,主要是BI报表数据, Hive的初衷是为减少复杂MR应用程序的编写工作,HBase则是为了实现对数据的实时访问。

1-5 Hive与传统数据库的对比


1-6 Hive的部署和应用
1-6-1 Hive在企业大数据分析平台中的应用
当前企业中部署的 大数据 分析平台,除Hadoop的基本组件HDFS和MR外,还结合使用Hive、Pig、 Hbase 、Mahout,从而满足不同业务场景需求。
上图是企业中一种常见的大数据分析平台部署框架 ,在这种部署架构中:
1) Hive和Pig用于报表中心,Hive用于分析报表,Pig用于报表中数据的转换工作。
2)HBase用于在线业务,HDFS不支持随机读写操作,而HBase正是为此开发,可较好地支持实时访问数据。
3)Mahout提供一些可扩展的机器学习领域的经典算法实现,用于创建商务智能(BI)应用程序。


二、Hive系统架构
下图显示Hive的主要组成模块、Hive如何与Hadoop交互工作、以及从外部访问Hive的几种典型方式。
Hive主要由以下三个模块组成:
1)用户接口模块,含CLI、HWI、JDBC、Thrift Server等,用来实现对Hive的访问。
CLI是Hive自带的命令行界面;HWI是Hive的一个简单网页界面;JDBC、ODBC以及Thrift Server可向用户提供进行编程的接口,其中Thrift Server是基于Thrift软件框架开发的,提供Hive的RPC通信接口。
2)驱动模块(Driver),含编译器、优化器、执行器等, 负责把HiveQL语句转换成一系列MR作业,所有命令和查询都会进入驱动模块 ,通过该模块的解析变异,对计算过程进行优化,然后按照指定的步骤执行。
3) 元数据存储模块(Metastore),是一个独立的关系型数据库,通常与MySQL数据库连接后创建的一个MySQL实例,也可以是Hive自带的Derby数据库实例。 此模块主 要保存表模式和其他系统元数据 ,如表的名称、表的列及其属性、表的分区及其属性、表的属性、表中数据所在位置信息等。

三、Hive工作原理
3-1 SQL语句转换成MapReduce作业的基本原理
3-1-1 用MapReduce实现连接操作
假设连接(join)的两个表分别是用户表User(uid,name)和订单表Order(uid,orderid),具体的SQL命令: select name,orderid from User u join Order o on u.uid=o.uid;
上图描述了连接操作转换为MapReduce操作任务的具体执行过程。
首先,在Map阶段,
1)User表以uid为key,以name和表的标记位(这里User的标记位记为1)为value,进行Map操作,把表中记录转换生成一系列KV对的形式。比如,User表中记录(1,Lily)转换为键值对(1,<1,Lily>),其中第一个“1”是uid的值,第二个“1”是表User的标记位,用来标示这个键值对来自User表;
2)同样,Order表以uid为key,以orderid和表的标记位(这里表Order的标记位记为2)为值进行Map操作,把表中的记录转换生成一系列KV对的形式;
3)接着,在Shuffle阶段,把User表和Order表生成的KV对按键值进行Hash,然后传送给对应的Reduce机器执行。比如KV对(1,<1,Lily>)、(1,<2,101>)、(1,<2,102>)传送到同一台Reduce机器上。当Reduce机器接收到这些KV对时,还需按表的标记位对这些键值对进行排序,以优化连接操作;
4)最后,在Reduce阶段,对同一台Reduce机器上的键值对,根据“值”(value)中的表标记位,对来自表User和Order的数据进行笛卡尔积连接操作,以生成最终的结果。比如键值对(1,<1,Lily>)与键值对(1,<2,101>)、(1,<2,102>)的连接结果是(Lily,101)、(Lily,102)。


3-1-2 用MR实现分组操作
假设分数表Score(rank, level),具有rank(排名)和level(级别)两个属性,需要进行一个分组(Group By)操作,功能是把表Score的不同片段按照rank和level的组合值进行合并,并计算不同的组合值有几条记录。SQL语句命令如下:
select rank,level,count(*) as value from score group by rank,level;
上图描述分组操作转化为MapReduce任务的具体执行过程。
1)首先,在Map阶段,对表Score进行Map操作,生成一系列KV对,其键为,值为“拥有该组合值的记录的条数”。比如,Score表的第一片段中有两条记录(A,1),所以进行Map操作后,转化为键值对(,2);
2)接着在Shuffle阶段,对Score表生成的键值对,按照“键”的值进行Hash,然后根据Hash结果传送给对应的Reduce机器去执行。比如,键值对(,2)、(,1)传送到同一台Reduce机器上,键值对(,1)传送另一Reduce机器上。然后,Reduce机器对接收到的这些键值对,按“键”的值进行排序;
3)在Reduce阶段,把具有相同键的所有键值对的“值”进行累加,生成分组的最终结果。比如,在同一台Reduce机器上的键值对(,2)和(,1)Reduce操作后的输出结果为(A,1,3)。


3-2 Hive中SQL查询转换成MR作业的过程
当Hive接收到一条HQL语句后,需要与Hadoop交互工作来完成该操作。HQL首先进入驱动模块,由驱动模块中的编译器解析编译,并由优化器对该操作进行优化计算,然后交给执行器去执行。执行器通常启动一个或多个MR任务,有时也不启动(如SELECT * FROM tb1,全表扫描,不存在投影和选择操作)
上图是Hive把HQL语句转化成MR任务进行执行的详细过程。
1)由驱动模块中的编译器–Antlr语言识别工具,对用户输入的SQL语句进行词法和语法解析,将HQL语句转换成抽象语法树(AST Tree)的形式;
2)遍历抽象语法树,转化成QueryBlock查询单元。因为AST结构复杂,不方便直接翻译成MR算法程序。其中QueryBlock是一条最基本的SQL语法组成单元,包括输入源、计算过程、和输入三个部分;
3)遍历QueryBlock,生成OperatorTree(操作树),OperatorTree由很多逻辑操作符组成,如TableScanOperator、SelectOperator、FilterOperator、JoinOperator、GroupByOperator和ReduceSinkOperator等。这些逻辑操作符可在Map、Reduce阶段完成某一特定操作;
3)Hive驱动模块中的逻辑优化器对OperatorTree进行优化,变换OperatorTree的形式,合并多余的操作符,减少MR任务数、以及Shuffle阶段的数据量;遍历优化后的OperatorTree,根据OperatorTree中的逻辑操作符生成需要执行的MR任务;启动
4)Hive驱动模块中的物理优化器,对生成的MR任务进行优化,生成最终的MR任务执行计划;
5)最后,有Hive驱动模块中的执行器,对最终的MR任务执行输出。
Hive驱动模块中的执行器执行最终的MR任务时,Hive本身不会生成MR算法程序。 它通过一个表示“Job执行计划”的XML文件,来驱动内置的、原生的Mapper和Reducer模块。Hive通过和JobTracker通信来初始化MR任务,而不需直接部署在JobTracker所在管理节点上执行。通常在大型集群中,会有专门的网关机来部署Hive工具,这些网关机的作用主要是远程操作和管理节点上的JobTracker通信来执行任务。Hive要处理的数据文件常存储在HDFS上,HDFS由名称节点(NameNode)来管理。 JobTracker/TaskTracker NameNode/DataNode
四、Hive HA基本原理
在实际应用中,Hive也暴露出不稳定的问题,在极少数情况下,会出现端口不响应或进程丢失问题。Hive HA(High Availablity)可以解决这类问题。
在Hive HA中,在Hadoop集群上构建的数据仓库是由多个Hive实例进行管理的,这些Hive实例被纳入到一个资源池中,由HAProxy提供统一的对外接口。客户端的查询请求,首先访问HAProxy,由HAProxy对访问请求进行转发。HAProxy收到请求后,会轮询资源池中可用的Hive实例,执行逻辑可用性测试。
1)如果某个Hive实例逻辑可用,就会把客户端的访问请求转发到Hive实例上;
2)如果某个实例不可用,就把它放入黑名单,并继续从资源池中取出下一个Hive实例进行逻辑可用性测试。
3)对于黑名单中的Hive,Hive HA会每隔一段时间进行统一处理,首先尝试重启该Hive实例,如果重启成功,就再次把它放入资源池中。
由于HAProxy提供统一的对外访问接口,因此,对于程序开发人员来说,可把它看成一台超强“Hive”。
大数据
2019-09-23 20:04:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
RDD是spark的核心,也是整个spark的架构基础,RDD是弹性分布式集合(Resilient Distributed Datasets)的简称, 是分布式只读且已分区集合对象 。这些集合是弹性的,如果数据集一部分丢失,则可以对它们进行重建。具有自动容错、位置感知调度和可伸缩性,而容错性是最难实现的,大多数分布式数据集的 容错性有两种方式:数据检查点和记录数据的更新 。对于大规模数据分析系统,数据检查点操作成本高,主要原因是大规模数据在服务器之间的传输带来的各方面的问题,相比记录数据的更新,RDD也只支持粗粒度的转换,也就是记录如何从其他RDD转换而来(即lineage),以便恢复丢失的分区。
简而言之,特性如下:
1. 数据结构不可变
2. 支持跨集群的分布式数据操作
3. 可对数据记录按key进行分区
4. 提供了粗粒度的转换操作
5. 数据存储在内存中,保证了低延迟性
大数据
2019-09-23 19:02:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
kafka的相对offset
每个分区的文件存储在各自分区的目录中。
每个分区的offset都从1开始。
每个分区将数据切分成多个segment(段), 每个段对应了两个文件:log和index
第一组命名为:00000000000000000000.index和00000000000000000000.log 。
从第二组开始, 文件名是上一组最后一条消息的offset。
log文件是以追加的方式存储。index文件存储的是稀疏索引, 并没有存储所有日志的偏移量。
如何寻找某offset的消息?
如, 寻找offset为2019的消息: 二分法, 寻找到文件名<=2019的最大segment文件。
如找到了名为2000的文件。 2019-2000=19。在对应的index文件中寻找<=19的最大相对offset。
如,找到了15,300 去对应的log文件中寻找到300pos(第15条), 并按顺序读到第19条消息即可。
大数据
2019-09-22 19:06:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
工业互联网和 5G 时代正逐步到来,万物智能网联、智能互联已成为一种趋势。目前物联网应用在很多场景下,如智慧制造、智慧城市、智慧农业等。本文以智慧城市为背景,介绍西安中服软件有限公司是如何利用大数据分析神兽 Apache Kylin,让物联网的传感器信息,通过云化的大数据物联网云平台,对感知的数据进行分析处理,进而满足智慧城市的建设需求。

业务背景
作为智慧城市的重要组成部分,智慧楼宇也以惊人的速度发展着,挖掘和处理楼宇数据,并以此来精准控制并降低楼宇能耗成为一个重要问题。以 IoT 设备完成能源网络搭建,并进行数据采集,然后推送到流处理工具,楼宇管理者可以通过常规的数据分析,根据不同的用能区域、能耗类型,对建筑能耗进行分时段计量和分项计量,分别计算指定时间范围内电、水、气等能源的使用量,并且对未来能耗使用进行一定程度的预测。管理者可通过梳理不同能源的使用情况,实现对能源的高效管理。
做一个单点的能源管理系统并不存在技术难点,但是在设备上云后,要将其做成一个 Saas 产品,我们就需要考虑很多问题,如性能问题等。 传统的做法,不同的租户将物联网设备在云端绑定后,系统自动为他们生成对应的数据库表,以此来做到资源隔离。这些物联设备的事实数据和维度数据都存放于 Oracle 中,但是使用关系型数据库存在一些问题: 无法支撑大数据量,当数据量达到一定程度时,需要进行倒库。 为了解决查询的性能问题,往往需要创建多张中间表,去存放按属性查询过的历史数据结果。 用户在修改设备的配置表时,涉及到多表联动修改。这种处理方法不仅可靠性低,并且对开发人员和数据分析人员来说都是极为复杂的。
因此,需要一个能支持快速查询且易维护的数据存储及分析引擎,来解决关系型数据库存在的这些瓶颈。

技术选型
我们团队在引入 Kylin 之前,曾经尝试过很多种解决方案。
第一种方法是将所有的数据都加载到 Hive 中,这种方式虽然能够支撑较大数据量,但查询时需要等待很长时间才能获取到结果,查询延时无法满足即时性要求。
我们尝试过使用 Druid。Druid 已经是目前市面上较为成熟的实时 OLAP 引擎,但是它只支持单表查询,而我们的能耗分析通常需要多表联查。此外,Druid 的原生查询语句是一种自定义的 json 格式,并非我们所熟悉的 SQL,这对于非专业人员来说上手存在一定的难度。
后来,我们使用 Kylin 进行能耗分析测试,在这过程中我们发现它有以下几大特点:百亿数据集支持、SQL 支持、准实时、亚秒级响应,对外提供 REST API 和JDBC/ODBC 两种访问方式。Kylin 架构如下图所示。这些特点能够满足智慧楼宇能效查询的大部分场景,因此我们最终决定选用 Apache Kylin 作为中服楼宇能耗管理平台的存储引擎和查询引擎。
楼宇能耗 OLAP 架构
中服的楼宇能耗 OLAP 架构如下图所示,考虑到设备传感器采集到的数据并非每次都是完整的,因此需要流处理工具提前做适配、计算、校验以及转换成 json 格式,最终输入到 Kafka 指定的 topic 中。该架构中使用的 Kylin 版本为 2.6.2,此版本相对稳定,且修改了 Kafka 对接 Kylin 会出现 8 个小时时差的错误,保证了在几个衍生时间维度上的准确性。
Kylin的事实表是来自 Kafka topic 里的设备传感器的json 数据(不同设备类型的数据通过流处理工具进入到不同的 topic 中),维度表是存放于 Hive 中用户定义的配置表,例如设备表、设备区域表等,事实表与维度表连接构成星型模型。数据分析工程师根据可能要查询到的场景来分析这些表中的维度和度量,以此来配置相应水、电、气的 Model 和 Cube,再依托于 MR 进行 Cube 的构建。
根据业务场景,整个流数据是源源不断的,需要每小时 build 一次Cube,待 Cube 构建完成,对外提供两种方式查询能耗数据,一种采用 JDBC/REST API 的方式为前端能耗管理界面提供数据,另一种是采用 JDBC 的方式无缝对接 BI 工具,用以能耗的 Display 和 Dashboard 展示。

使用 Kylin 实现能源管理 SaaS 平台
我们的初衷是做一个能耗管理云平台,通过对实际需求进行详细的分析,在用户将物联网设备上云后,系统能够自动协助该设备绑定至特定类型的 Cube 中,比如能耗管理场景下某个区域的电表会自动绑定到 IoT_Electric_Cube中。换句话说,用户只关心将自己的设备连上,再到可视化面板查看相关能耗使用情况,不需要知道背后的数据分析引擎具体是怎么操作的。因此,我们需要解决两个问题:Cube 的自动化创建以及多租户的问题。
1. Kylin 各模块的自动化创建
由于使用 Kylin 有一定的门槛,需要进行一些专业的分析和操作。而使用我们云平台的用户更多的只希望在完成设备绑定之后,就能查询到相关能耗信息,此外,从一定程度上来说,大部分用户并未掌握 Kylin 的使用方法,因此如果让用户根据绑定的设备在线去配置Model、Cube,将会成为一个很大的难题。为了解决该问题,我们进行了下图的配置。
我们针对能效分析场景中的水、电、气、油等能耗类型,提前创建好一个预置的Cube,后期用户在绑定设备后,系统会根据预置的 Cube 和新的设备维度,分析出常量维度、特殊维度以及度量,生成新的Cube 描述信息,用以自动化创建 Model 以及 Cube。另外,针对于用户可能更改设备配置表的内容,我们对其提供了一个 API,可跟随配置表的内容动态更改 Kylin 维度表中的数据。
2. 平台多租户的资源隔离
针对于平台的多租户问题,由于 Kylin 目前对多租户的支持较为简单,只是简单将用户分成了几个角色,不同角色的用户对元数据及 Cube 有不同的操作权限,而我们平台的租户只需在查询时保证相互之间数据的独立性即可,因此我们只做资源数据的上层隔离,对下面的 HBase 存储来说还是统一的。我们的做法就是在 Project 层做处理,一个租户创建一个 Project,再根据 Project 对应绑定不同 DataSource、Model 以及 Cube,再由整个平台的管理者对所有 Project进行管理。但如果租户量达到一定量时,此种方法并不可取,目前我们正积极针对多租户的问题做相应的优化。
此外,多租户机制下会产生大量的垃圾数据,包括Kylin 在构建 Cube 过程中会在 HDFS 上生成中间数据,以及在执行 drop/purge/merge 等操作时,HBASE 的表可能不会及时地被 Kylin 清理,一直保留在 HBASE 中,久而久之对 HBASE 的运维查询都必将造成一定的压力。因此,我们做了一些调整,每隔一段时间执行一些清理工作,以保证各集群的正常工作。

Cube 维度优化
随着维度数目的增加,Cuboid 的数量成指数级增长。针对能效分析场景下水、电、气各个 Cube,以电为例,电表采集点有 A 相电流、B 相电流、C 相电流、AB 相电流等近 40 个维度,以及正向有功电度和、正向无功电度和等4个度量,这么庞大的维度指标如果不做维度优化,将会有 2^40 个 Cuboid,Kylin 会提示无法创建该 Cube。
为了缓解 Cube 的构建压力,Kylin 提供了 Cube 的高级设置。这些高级设置包括聚合组(Aggregation Group)、联合维度(Joint Dimension)、层级维度(Hierarchy Dimension)和强制维度(Mandatory Dimension)等。通过对电表设备的维度进行分析,最终确定了 2 个强制维度,电压、电流等多项常量维度组合成一组联合维度,year_start、month_start 等时间衍生维度组合成一组联合维度,正向有功电度等特殊维度不做处理。优化后的维度配置如下图所示。
同时,我们也尝试过将常量维度和特殊维度拆分开,独立构建不同的 Cube 并对外提供查询,但考虑到多租户场景下要维护多个 Cube,因此放弃了这种做法。
对维度进行合理配置,可有效对 Cuboid 进行剪枝优化,减少不必要的资源浪费,筛选出真正需要的 Cuboid,有效降低构建时间,提高集群资源的利用效率。我们根据 Kylin 的高级配置对维度进行配置后,虽然牺牲了某些场景的查询性能,但是极大的优化了聚合的性能。以下是优化之后的性能指标(一个小时 Build):

运维监控
由于多租户情况下,存在并行的 Build Cube 操作,那么就可能存在 Cube 构建失败的情况,从而导致数据分析结果不准确。为了避免这种情况,我们做了两部分工作:在 Job 构建成功时会在系统日志中打印出相关信息;Job 构建失败时会及时给运维人员发送短信通知,哪个租户的哪个 Cube 构建时出现错误,以便运维人员能够快速的定位构建失败的 Job Name 并解决相关问题。

总结
针对 SaaS 平台下物联网的能耗管理问题,使用 Kylin 作为 OLAP 引擎,有诸多优势。 对开发人员来说,在协同开发时,Kylin 工程师只需定义好各模块的自动化创建 API,如创建 Project、绑定 Kafka DataSource、同步 Hive 维度表、创建 Model、创建 Cube等,其他部门按一定规则调用就可完成 SaaS 形式的系统创建。此外,开发人员不再需要担心库满而进行倒库操作。 数据分析师使用 Davinci 创建可视化面板时,以 Kylin 作为分析引擎,实现海量数据分析结果的实时展示。 对于用户而言,通过前端查看相关数据时,对比传统关系型数据库查询方式,Kylin 能够以更低的延时呈现出数据分析的结果。
这里仅仅展示的是 Kylin 赋能物联网的一个应用场景,我们后期将继续探索更多的应用可能。

了解更多大数据资讯,点击进入Kyligence官网
大数据
2019-09-13 00:02:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
Zookeeper中写操作都有维护一个事务日志,专门用于写znode操作。只有事务日志确认过的数据变更才会在整个集群生效。
1、TxnLog
TxnLog是写事务日志接口,主要包含以下接口: rollLog: 日志滚动,启动新日志 append:添加写事务到日志尾部 getLastLoggedZxid:读取最后记录的zxid truncate:截断写事务,用在Learner事务比Leader事务多的场景 commit:提交事务,确认事务已持久化
rollLog方法提供日志滚动功能,如果任随事务日志文件无限增长,势必影响到性能,rollLog方法会重新启动新的事务日志文件名,后续事务写到新的文件中,ZooKeeper中多个事务日志文件通常会以zxid来区别。
Truncate方法提供截断日志功能,将zxid之后的事务全部删除。通常当Follower的事务日志比Leader还多的时候会触发改方法,保证Follower和Leader的数据库同步。
Commit方法提交文件缓存到磁盘,确保事务真实写到磁盘而不是仅仅存在于文件内存缓存中。
2、实现类FileTxnLog
ZooKeeper中实现TxnLog接口的类是FileTxnLog,它的主要功能和方法包括以下这些。
2.1 append方法
主要代码: logFileWrite = new File(logDir, ("log." + Long.toHexString(hdr.getZxid()))); fos = new FileOutputStream(logFileWrite); logStream=new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); fhdr.serialize(oa, "fileheader"); logStream.flush(); currentSize = fos.getChannel().position(); streamsToFlush.add(fos); padFile(fos); Checksum crc = makeChecksumAlgorithm(); crc.update(buf, 0, buf.length); oa.writeLong(crc.getValue(), "txnEntryCRC"); Util.writeTxnBytes(oa, buf);
该方法将事务添加到日志文件的尾部。
日志没滚动前,写到当前日志文件;日志回滚的话写到新的日志文件,新日志文件的名称是”log.”加当前zxid的值。
2.2 rollLog方法
日志滚动,关闭旧的日志文件,启动新的日志文件,主要代码: if (logStream != null) { this.logStream.flush(); this.logStream = null; }
2.3 getLastLoggedZxid方法
从日志文件中获取最近记录的zxid的值,从lastLoggedZxid就能获得最新的事务日志文件。多个日志文件的情况会遍历所有文件,选出文件名中zxid最大的那个日志文件,再从该日志文件中取到最大的zxid。
主要代码: public long getLastLoggedZxid() { File[] files = getLogFiles(logDir.listFiles(), 0); long zxid = maxLog; TxnIterator itr = null; FileTxnLog txn = new FileTxnLog(logDir); itr = txn.read(maxLog); while (true) { if(!itr.next()) break; TxnHeader hdr = itr.getHeader(); zxid = hdr.getZxid(); } return zxid; }

2.4 truncate方法
截断多余的日志信息,保证日志文件是合法有效的。
主要代码: public boolean truncate(long zxid) throws IOException { FileTxnIterator itr = null; itr = new FileTxnIterator(this.logDir, zxid); PositionInputStream input = itr.inputStream; long pos = input.getPosition(); RandomAccessFile raf=new RandomAccessFile(itr.logFile,"rw"); raf.setLength(pos); raf.close(); return true; }

2.5 commit方法
确认提交日志文件缓存,主要代码: public synchronized void commit() throws IOException { for (FileOutputStream log : streamsToFlush) { log.flush(); if (forceSync) { long startSyncNS = System.nanoTime(); log.getChannel().force(false); } } }
大数据
2019-09-09 19:20:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
笔者为什么要讲ZooKeeper的源码,对于程序员来说,光知道用是成为不了优秀的行家的,还要知道所以然,只有知道了内部实现机制,才能开拓眼界提高自我。而笔者认为ZooKeeper是最好的入门分布式系统的敲门砖。
好不多说,我们这里先看看客户端是怎么运转的。
1、概述
ZooKeeper客户端是连接到服务端集群,获取zk节点数据,监听zk节点数据变化的。zk节点就是znode,它是类似文件路径的东西,每个znode可以设置它的文本内容,当znode的文本内容被其他客户端修改后,所有监听该znode的客户端都会实时被通知到,这样的方式实现了分布式一致性存储。
在客户端里有一个ZooKeeper类,注意这里特指类名称。客户端通过Zookeeper类来发送命令给Zookeeper服务器。
ZooKeeper类中还可以设置Watcher,这就是znode监听者。Watcher可以指定监听哪个znode,当Zookeeper集群的znode节点状态发生变化时,服务端会发送通知消息给客户端的Watcher。
Watcher又可以细分为3种Watcher子类:DataWatcher,ExistWatcher和ChildWatcher。根据字面意思就能猜得出来,DataWatcher是znode的数据变化时触发,ExistWatcher是znode的创建删除时触发,ChildWatcher是在znode下创建子目录(也是znode)时触发。实际生产环境中用的最多的还是DataWatcher。
下面我们先分析分析ZooKeeper类的实现,至于Watcher的实现后面会有专门介绍。
2、通信机制
客户端与服务端交互的数据流大致如下:
首先是客户端ZooKeeper类发起命令请求,然后通过ClientCntx发送给服务端集群。ClientCnxn是上层类,屏蔽了具体的网络通信流程,网络通过是ClientCntxSocketNetty实现的,服务端是ZooKeeperServer。 以create命令(创建znode)为例,ZooKeeper类会构造Packet,将请求数据封装在Packet里。然后调用ClientCnxn的submitRequest方法。ClientCnxn的submitRequest方法调用queuePacket方法将Packet放入outgoingQueue队列中,然后线程执行wait方法挂起等待服务端返回。 ClientCnxnSocketNetty和ClientCnxn共享同一个outgoingQueue,ClientCnxnSocketNetty启动了发送守护进程,当outgoingQueue队列中有Packet时,会自动将该Packet发送给ZooKeeperServer。同时ClientCnxnSocketNetty启动接收线程实时接收ZooKeeperServer的返回数据,返回数据触发ClientCnxnSocketNetty中启动的ZKClientHandler的MessageReceived事件。 在MessageReceived事件中回调ClientCnxn中的SendThread类的readResponse方法。 readResponse方法中最后调用finishPacket方法唤醒在该Packet上wait的线程,也就是发起submitRequest的方法,使得submitRequest方法返回到ZooKeeper类。 客户端请求过程结束。
3、ZooKeeper类
客户端在构造函数阶段创建ClientCnxn 与服务端连接,后续命令都通过ClientCntx发送给服务端。ClientCnxn是客户端与服务端通信的底层接口,它和ClientCnxnSocketNetty一起工作提供网络通信服务。
服务端是ZooKeeperServer类,收到ClientCnxn的Request请求包后调用相应的处理逻辑,返回结果再通过ClientCnxc发送给客户端。
ClientCntx连接时可以同时指定多台服务器地址,根据一定的算法挑选某一个服务器地址进行连接,当某个服务器发生故障无法连接时,会自动连接其他的服务器。实现这一机制的是HostProdiver接口,实现用StaticHostProvider类。
ZooKeeper类的构造函数如下: public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException { if (clientConfig == null) { clientConfig = new ZKClientConfig(); } this.clientConfig = clientConfig; watchManager = defaultWatchManager(); watchManager.defaultWatcher = watcher; ConnectStringParser connectStringParser = new ConnectStringParser(connectString); hostProvider = aHostProvider; cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); }
这里的connectString是连接字符串,aHostProvider是管理服务端列表的。watcher是监听器。
为什么有aHostProvider?客户端可以配置多个服务端地址,这样当某个服务端挂掉的时候,客户端会自动尝试连接其他的服务端,实现分布式可靠性。
创建了ZooKeeper对象后就可以调用具体的读写数据的方法了,下面列举常见方法的实现机制。
create方法根据输入参数构造出CreateRequest包,然后通过submitRequest方法传递给服务端,submitRequest方法将CreateRequest转换成Packet包并调用sendPacket方法将发送包放入队列,等待发送线程发送给服务端。
服务端响应完成后会将返回结果填充到CreateResponse实体中返回给客户端。
4、发送命令
我们选取getData方法,来看看客户端的内部机制,其他命令的处理过程是类似的,不同的只是命令类型不同而已。
getData方法从服务端读取znode的数据,入参同时包括watcher,这样在znode数据被其他客户端修改后,会实时回调watcher来使得所有客户端同步本次变化。
先给出getData的代码: public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath); WatchRegistration wcb = null; if (watcher != null) { wcb = new DataWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, clientPath, serverPath, ctx, wcb); }
这里干了几件事情呢?主要干了两件事。
(1)注册watcher,这个很好理解,至于watcher的细节会在其他文章里专栏叙说。
(2)构建完整的znode的路径名,从根目录开始。然后将znode的路径名和GetDataRequest类型打包放到ClientCnxn的发送队列里,等待排队发往服务端。
其他命令的处理过程是类似的,不同的只是命令类型不同而已,对应到代码里是不同的Request对象。getData命令对应GetDataRequest类;Exists方法对应ExistsRequest类。他们的父类却是同一个。ZooKeeper支持的Request类主要有以下这些: create:CreateRequest delete:DeleteRequest exists:ExistsRequest getData:GetDataRequest setData:SetDataRequest getChildren:GetChildrenRequest
对于create命令来说,和GetData有一点不同。不同点在于以下两点:
(1)create命令是立即返回结果的,而getData等其他命令是异步返回结果的。getData入参里的DataCallback参数就是异步回调处理方法。
(2)create是调用ClientCnxn的submitRequest方法启动发送命令过程,而getData等其他方法是调用ClientCnxn的queuePacket方法将请求命令缓存在队列里,等待发送线程异步发送。

5、ClientCnxn
前面我们看到ZooKeeper类的命令发送都是通过ClientCnxn类实现的。这里就谈谈ClientCnxn类干了哪些活。
Clientcnxn将客户端请求加入发送队列,等待sendThread发送。eventThread负责处理Server返回的WatchedEvent,以及回调注册的客户端事件接口处理函数。
5.1 queuePacket
这是ClientCnxn里最重要的一个方法,它将请求包放入发送队列outgoingQueue中,等待发送线程发送给服务端。 public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration atchRegistration, WatchDeregistration watchDeregistration) { Packet packet = null; packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; packet.watchDeregistration = watchDeregistration; synchronized (state) { if (!state.isAlive() || closing) { conLossPacket(packet); } else { if (h.getType() == OpCode.closeSession) { closing = true; } outgoingQueue.add(packet); } } sendThread.getClientCnxnSocket().packetAdded(); return packet; }
最后告诉SendThread数据已经放好了,至于何时发送就等SendThread自己来决定了。
5.2 submitRequest
提交客户端请求到服务端,这是立即返回的方法,如果请求包没处理完则一直等待下去。submitRequest方法主要用在create命令。 ReplyHeader r = new ReplyHeader(); Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration); synchronized (packet) { while (!packet.finished) { packet.wait(); } } return r;
5.3 sendPacket
sendPacket构建Packet,然后调用发送线程SendThread里的同名sendPacket方法来发送数据到服务端。 public void sendPacket(Record request, Record response, AsyncCallback cb, int opCode) throws IOException { int xid = getXid(); RequestHeader h = new RequestHeader(); h.setXid(xid); h.setType(opCode); ReplyHeader r = new ReplyHeader(); r.setXid(xid); Packet p = new Packet(h, r, request, response, null, false); p.cb = cb; sendThread.sendPacket(p); }
5.4 finishPacket
该方法在接收到服务端的响应时,唤醒等待响应的客户端线程,通过调用Packet的notifyAll方法来唤醒wait在该Packet上的线程。
如果客户端请求指定了Watcher,则同时生成WatchedEvent事件并放入事件队列,等待eventThread线程处理。
代码片段: private void finishPacket(Packet p) { if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; eventThread.queuePacket(p); } }
5.5 readResponse
ClientCnxnSocketNetty收到服务端响应后触发ZKClientHandler的messageReceived事件,在该事件处理逻辑中调用sendThread的readResponse方法获取服务端响应。
如果服务端响应的是WatchedEvent事件,则将事件放入eventThread中等候调度执行事件方法。
如果服务端响应的是客户端命令结果,则将Packet从发送队列删除,最后调用CientCnxn的finishPacket方法完成最后的处理,finishPacket方法在前面已经讲过了。
readResponse的主要代码如下: void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); if (replyHdr.getXid() == -1) { // -1 means notification WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); } WatchedEvent we = new WatchedEvent(event); eventThread.queueEvent( we ); return; } Packet packet; try { packet.replyHeader.setXid(replyHdr.getXid()); packet.replyHeader.setErr(replyHdr.getErr()); packet.replyHeader.setZxid(replyHdr.getZxid()); if (replyHdr.getZxid() > 0) { lastZxid = replyHdr.getZxid(); } } finally { finishPacket(packet); } }
大数据
2019-09-09 13:21:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
今天看了一遍不错的文章,关于Java访问Hive的,正好要用到这一块,分享到此以便更多的人可以学习和应用
非常感谢博主的总结和分享
博文链接: https://www.jianshu.com/p/4ef28607fc04
Hive内置服务与HiveServer2应用
内置服务介绍
我们执行 hive --service help 查看内置的服务帮助,图中的Service List右侧罗列了很多Hive支持的服务列表,种类很多。
下面介绍最有用的一些服务:
(1)cli
cli是Command Line Interface 的缩写,是Hive的命令行界面,用的比较多,是默认服务,直接可以在命令行里使用。
(2)hiveserver
这个可以让Hive以提供Thrift服务的服务器形式来运行,可以允许许多个不同语言编写的客户端进行通信,使用需要启动HiveServer服务以和客户端联系,我们可以通过设置HIVE_PORT环境变量来设置服务器所监听的端口,在默认情况下,端口号为10000。
我们可以使用如下的指令启动该服务:hive --service hiveserver -p 10002,其中-p参数也是用来指定监听端口的。
(3)hwi
其实就是hive web interface的缩写它是hive的web借口,是hive cli的一个web替代方案。
(4)jar
与hadoop jar等价的Hive接口,这是运行类路径中同时包含Hadoop 和Hive类的Java应用程序的简便方式。
(5)metastore
在默认的情况下,metastore和hive服务运行在同一个进程中,使用这个服务,可以让metastore作为一个单独的进程运行,我们可以通过METASTOE——PORT来指定监听的端口号。
Hive的三种启动方式
hive shell模式
bin/hive 或者 bin/hive –-service cli
hive web界面启动模式
bin/hive –-service hwi & , & 表示后台运行。我们后台启动hwi服务,然后输入jps查看进程发现多了一个RunJar,表明我们的hive hwi启动成功。
用于通过浏览器来访问hive,感觉没多大用途,浏览器访问地址是: http://huatec01:9999/hwi/
启动示意图:
浏览器访问:

hive远程服务 (端口号10000) 启动方式
bin/hive --service hiveserver2 &
用java,python等程序实现通过jdbc等驱动的访问hive就用这种起动方式了,这个是程序员最需要的方式了。

HiveServer与HiveServer2
HiveServer2介绍
HiveServer与HiveServer2,两者都允许远程客户端使用多种编程语言,通过HiveServer或者HiveServer2,客户端可以在不启动CLI的情况下对Hive中的数据进行操作,连这个和都允许远程客户端使用多种编程语言如java,python等向hive提交请求,取回结果。
官方说明: HiveServer is scheduled to be removed from Hive releases starting Hive 0.15. See HIVE-6977. Please switch over to HiveServer2.
从hive0.15起就不再支持hiveserver了(我的hive版本为2.1.1),但是在这里我们还是要说一下hiveserver,其实在前面的Server List中就不包含hiveserver。
我们也可以尝试执行 bin/hive –-service hiveserver ,会输出日志提示 Service hiveserver not found 。
HiveServer或者HiveServer2都是基于Thrift的,但HiveSever有时被称为Thrift server,而HiveServer2却不会。既然已经存在HiveServer,为什么还需要HiveServer2呢?
这是因为HiveServer不能处理多于一个客户端的并发请求,这是由于HiveServer使用的Thrift接口所导致的限制,不能通过修改HiveServer的代码修正。因此在Hive-0.11.0版本中重写了HiveServer代码得到了HiveServer2,进而解决了该问题。HiveServer2支持多客户端的并发和认证,为开放API客户端如JDBC、ODBC提供更好的支持。
HiveServer与HiveServer2的区别
Hiveserver和hiveserver2的JDBC区别: HiveServer version Connection URL Driver Class HiveServer2 jdbc:hive2://: org.apache.hive.jdbc.HiveDriver HiveServer jdbc:hive://: org.apache.hadoop.hive.jdbc.HiveDriver
HiveServer2的配置
Hiveserver2允许在配置文件hive-site.xml中进行配置管理,具体的参数为: hive.server2.thrift.min.worker.threads– 最小工作线程数,默认为5。 hive.server2.thrift.max.worker.threads – 最小工作线程数,默认为500。 hive.server2.thrift.port– TCP 的监听端口,默认为10000。 hive.server2.thrift.bind.host– TCP绑定的主机,默认为localhost
我们可以在hive-site.xml文件中搜索“hive.server2.thrift.min.worker.threads”属性(hive-site.xml文件配置属性达到5358行,太长了,建议搜索),然后进行编辑,示例如下:

从Hive-0.13.0开始,HiveServer2支持通过HTTP传输消息,该特性当客户端和服务器之间存在代理中介时特别有用。与HTTP传输相关的参数如下: hive.server2.transport.mode – 默认值为binary(TCP),可选值HTTP。 hive.server2.thrift.http.port– HTTP的监听端口,默认值为10001。 hive.server2.thrift.http.path – 服务的端点名称,默认为 cliservice。 hive.server2.thrift.http.min.worker.threads– 服务池中的最小工作线程,默认为5。 hive.server2.thrift.http.max.worker.threads– 服务池中的最小工作线程,默认为500。
我们同理可以进行搜索,然后进行配置。
启动HiveServer2
启动Hiveserver2有两种方式,一种是上面已经介绍过的 hive --service hiveserver2 ,另一种更为简洁,为 hiveserver2 。
我们采用第二种方式启动hiveserver2,如下图所示:
启动后hiveserver2会在前台运行,我们开启一个新的SSH链接,使用jps查看会发现多出一个RunJar进程,它代表的就是HiveServer2服务。
使用 hive--service hiveserver2 –H 或 hive--service hiveserver2 –help 查看帮助信息。
默认情况下,HiveServer2以提交查询的用户执行查询(true),如果hive.server2.enable.doAs设置为false,查询将以运行hiveserver2进程的用户运行。为了防止非加密模式下的内存泄露,可以通过设置下面的参数为true禁用文件系统的缓存 fs.hdfs.impl.disable.cache – 禁用HDFS文件系统缓存,默认值为false。 fs.file.impl.disable.cache – 禁用本地文件系统缓存,默认值为false。
浏览器查看 http://huatec01:10002 ,如下图所示:

配置和使用HiveServer2
配置坚挺端口和路径 hive.server2.thrift.port 10000 Port number of HiveServer2 Thrift interface when hive.server2.transport.mode is 'binary'. hive.server2.thrift.bind.host huatec01 Bind host on which to run the HiveServer2 Thrift service.
第一个属性默认即可,第二个将主机名改为我们当前安装hive的节点。
设置impersonation
这样hive server会以提交用户的身份去执行语句,如果设置为false,则会以起hive server daemon的admin user来执行语句。 hive.server2.enable.doAs true Setting this property to true will have HiveServer2 execute Hive operations as the user making the calls to it.
我们将值改为 true 。
hiveserver2节点配置
Hiveserver2已经不再需要hive.metastore.local这个配置项了,我们配置hive.metastore.uris,如果该属性值为空,则表示是metastore在本地,否则就是远程。 hive.metastore.uris Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.
默认留空,也就是metastore在本地,使用默认即可。
如果想要配置为远程的话,参考如下: hive.metastore.uris thrift://xxx.xxx.xxx.xxx:9083
zookeeper配置 hive.support.concurrency true Whether Hive supports concurrency control or not. A ZooKeeper instance must be up and running when using zookeeper Hive lock manager hive.zookeeper.quorum huatec03:2181,huatec04:2181,huatec05:2181 List of ZooKeeper servers to talk to. This is needed for: 1. Read/write locks - when hive.lock.manager is set to org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager, 2. When HiveServer2 supports service discovery via Zookeeper. 3. For delegation token storage if zookeeper store is used, if hive.cluster.delegation.token.store.zookeeper.connectString is not set 4. LLAP daemon registry service
属性1设置支持并发,属性2设置Zookeeper集群。
注意:没有配置hive.zookeeper.quorum会导致无法并发执行hive ql请求和导致数据异常。
hiveserver2的Web UI配置
Hive 2.0 以后才支持Web UI的,在以前的版本中并不支持。 hive.server2.webui.host 0.0.0.0 The host address the HiveServer2 WebUI will listen on hive.server2.webui.port 10002 The port the HiveServer2 WebUI will listen on. This can beset to 0 or a negative integer to disable the web UI
默认即可,我们通过浏览器访问: http://huatec01:10002 即可访问hiveserver2,这个前面已经试过了。
启动服务
启动metastore bin/hive --service metastore &
启动hiveserver2 bin/hive --service hiveserver2 &
WebUI: http://huatec01:10002
使用beeline控制台控制hiveserver2
首先我们必须启动metastore和hiveserver2
然后启动beeline bin/beeline
尝试连接metastore: !connect jdbc:hive2://huatec01:10000 root root
如下图表明连接成功!
beeline错误1
beeline连接hiveserver2失败,报错如下: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.authorize.AuthorizationException): User: master is not allowed to impersonate hive (state=,code=0)
解决方法: 关闭hadoop集群 修改core-site.xml文件,增加如下内容: hadoop.proxyuser.hadoop.groups root Allow the superuser oozie to impersonate any members of the group group1 and group2 hadoop.proxyuser.hadoop.hosts huatec01,127.0.0.1,localhost The superuser can connect only from host1 and host2 to impersonate a user
注意所有节点的core-site.xml都修改。 重启hadoop集群 启动metastore和hiveserver2,重新连接hiveserver2。
beeline错误2
beeline连接hiveserver2成功,但是执行sql语句报错,错误如下: 0: jdbc:hive2://huatec01:10000> show databases; Error: java.io.IOException: java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: ${system:user.name%7D (state=,code=0)
解决方法: 修改hive-site.xml中的 hive.exec.local.scratchdir 属性值。将 ${system:user.name} 改为 ${user.name} ,如下所示: hive.exec.local.scratchdir /huatec/apache-hive-2.1.1-bin/tmp/${user.name} Local scratch space for Hive jobs
重新使用beeline连接hiveserver2,执行sql语句,如下图所示:
Java编程操作MetaStore
用java,python等程序实现通过jdbc等驱动的访问hive,这需要我们启动hiveserver2。如果我们能够使用beeline控制hiveserver2,那么我们毫无疑问是可以通过Java代码来访问hive了。
如果beeline控制hiveserver2出现错误,也无法执行sql,那么请先解决这方面的错误,然后再进行代码编程。
准备工作
新建maven java app项目,然后添加Hive依赖,我们编写junitc俄式代码,所以也添加junit依赖,如下所示: junit junit 4.12 org.apache.hive hive-jdbc 2.1.1
编写测试类
完整的类代码如下: package com.huatec.hive; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.sql.*; /** * Created by zhusheng on 2018/1/2. */ public class HiveJDBC { private static String driverName = "org.apache.hive.jdbc.HiveDriver"; private static String url = "jdbc:hive2://huatec01:10000/hive_jdbc_test"; private static String user = "root"; private static String password = "root"; private static Connection conn = null; private static Statement stmt = null; private static ResultSet rs = null; // 加载驱动、创建连接 @Before public void init() throws Exception { Class.forName(driverName); conn = DriverManager.getConnection(url,user,password); stmt = conn.createStatement(); } // 创建数据库 @Test public void createDatabase() throws Exception { String sql = "create database hive_jdbc_test"; System.out.println("Running: " + sql); stmt.execute(sql); } // 查询所有数据库 @Test public void showDatabases() throws Exception { String sql = "show databases"; System.out.println("Running: " + sql); rs = stmt.executeQuery(sql); while (rs.next()) { System.out.println(rs.getString(1)); } } // 创建表 @Test public void createTable() throws Exception { String sql = "create table emp(\n" + "empno int,\n" + "ename string,\n" + "job string,\n" + "mgr int,\n" + "hiredate string,\n" + "sal double,\n" + "comm double,\n" + "deptno int\n" + ")\n" + "row format delimited fields terminated by '\\t'"; System.out.println("Running: " + sql); stmt.execute(sql); } // 查询所有表 @Test public void showTables() throws Exception { String sql = "show tables"; System.out.println("Running: " + sql); rs = stmt.executeQuery(sql); while (rs.next()) { System.out.println(rs.getString(1)); } } // 查看表结构 @Test public void descTable() throws Exception { String sql = "desc emp"; System.out.println("Running: " + sql); rs = stmt.executeQuery(sql); while (rs.next()) { System.out.println(rs.getString(1) + "\t" + rs.getString(2)); } } // 加载数据 @Test public void loadData() throws Exception { String filePath = "/home/hadoop/data/emp.txt"; String sql = "load data local inpath '" + filePath + "' overwrite into table emp"; System.out.println("Running: " + sql); stmt.execute(sql); } // 查询数据 @Test public void selectData() throws Exception { String sql = "select * from emp"; System.out.println("Running: " + sql); rs = stmt.executeQuery(sql); System.out.println("员工编号" + "\t" + "员工姓名" + "\t" + "工作岗位"); while (rs.next()) { System.out.println(rs.getString("empno") + "\t\t" + rs.getString("ename") + "\t\t" + rs.getString("job")); } } // 统计查询(会运行mapreduce作业) @Test public void countData() throws Exception { String sql = "select count(1) from emp"; System.out.println("Running: " + sql); rs = stmt.executeQuery(sql); while (rs.next()) { System.out.println(rs.getInt(1) ); } } // 删除数据库 @Test public void dropDatabase() throws Exception { String sql = "drop database if exists hive_jdbc_test"; System.out.println("Running: " + sql); stmt.execute(sql); } // 删除数据库表 @Test public void deopTable() throws Exception { String sql = "drop table if exists emp"; System.out.println("Running: " + sql); stmt.execute(sql); } // 释放资源 @After public void destory() throws Exception { if ( rs != null) { rs.close(); } if (stmt != null) { stmt.close(); } if (conn != null) { conn.close(); } } }
需要注意的是,因为hive默认只有一个数据库 default ,从前面的beeline访问hiveserver2的时候我们也可以看出。如果我们需要对默认数据库进行操作的话,我们的数据库连接为: private static String url = "jdbc:hive2://huatec01:10000/default";
这里我写了一个创建数据库的测试方法,其它的Sql操作都是基于该数据库的,所以我修改我的数据库连接为我新建的数据库。 private static String url = "jdbc:hive2://huatec01:10000/hive_jdbc_test";
测试函数比较多,我本地进行了测试都是可以成功的,我选取其中的 createTable 测试函数为例,截图如下:


作者:Jusen
链接:https://www.jianshu.com/p/4ef28607fc04
来源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
大数据
2019-09-06 15:27:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
在我们平常的生活工作中,百度、谷歌这些搜索网站已经成为了我们受教解惑的学校,俗话说得好,“有问题找度娘”。那么百度是如何在海量数据中找到自己需要的数据呢?为什么它搜索的速度如此之快?我们都知道是因为百度的搜索引擎,那么搜索引擎到底是个什么东西呢?可能有的程序员会想到es,但是es并不能代表搜索引擎,它只是其中的一种工具,不过这种工具确实好用,效率很高。
本文会向大家讲述搜索引擎的基本知识以及中文分词的一些方法、然后会做一个小的demo来尝试数据检索。让大家初步了解搜索引擎的实现。
一、搜索引擎介绍
1.1 搜索引擎是什么
这里引用百度百科的介绍: 搜索引擎(Search Engine)是指根据一定的策略、运用特定的计算机程序从互联网上搜集信息,在对信息进行组织和处理后,为用户提供检索服务,将用户检索相关的信息展示给用户的系统。
1.2 搜索引擎分类
搜索引擎包括全文索引、目录索引、元搜索引擎、垂直搜索引擎、集合式搜索引擎、门户搜索引擎与免费链接列表等。
本文主要介绍全文索引,即百度使用的搜索引擎分类。
全文索引
首先是数据库中数据的搜集,搜索引擎的自动信息搜集功能分两种: 一种是定期搜索,即每隔一段时间(比如Google一般是28天),搜索引擎主动派出“蜘蛛”程序,对一定IP地址范围内的互联网网站进行检索,一旦发现新的网站,它会自动提取网站的信息和网址加入自己的数据库。 另一种是提交网站搜索,即网站拥有者主动向搜索引擎提交网址,它在一定时间内(2天到数月不等)定向向你的网站派出“蜘蛛”程序,扫描你的网站并将有关信息存入数据库,以备用户查询。
当用户以关键词查找信息时,搜索引擎会在数据库中进行搜寻,如果找到与用户要求内容相符的网站,便采用特殊的算法——通常根据网页中关键词的匹配程度、出现的位置、频次、链接质量——计算出各网页的相关度及排名等级,然后根据关联度高低,按顺序将这些网页链接返回给用户。这种引擎的特点是搜全率比较高。
1.3 搜索引擎能解决什么问题 高效查询数据(运用多种算法查询数据,查询速率是毫秒级别,无论是千万条数据还是上亿的数据) 比较容易,将普通的数据库切换成搜索引擎比较容易。 大数据量、时效性、高并发等等。
1.4 搜索引擎的应用场景 数据库达到百万数据级别的时候 要求检索时效性、性能要求高,Ms级响应
1.5 Solr
接下来看在平常的互联网中搜索引擎的应用Solr。那么什么是Solr呢?它和es相比有什么优点和不足呢?
我们先来简单地介绍一下solr: Solr是一个基于Lucene的全文搜索服务器。同时对其进行了扩展,提供了比Lucene更为丰富的面向使用的查询语言,同时实现了可配置、可扩展并对查询性能进行了优化,并且提供了一个完善的功能管理界面。它支持Xml/Http协议,支持JSONAPI接口。
它具有如下特点: 可扩展性:Solr可以把建立索引和查询处理的运算分布到一个集群内的多台服务器上。 快速部署:Solr是开源软件,安装和配置都很方便,可以根据安装包内的Sample配置直接上手,可分为单机和集群模式。 海量数据:Solr是针对亿级以上的海量数据处理而设计的,可以很好地处理海量数据检索。 优化的搜索功能:Solr搜索速度够快,对于复杂的搜索查询Solr可以做到毫秒级的处理,通常,几十毫秒就能处理完一次复杂查询。
二、分词介绍
接下来,我们将了解分词是如何实现的。那么,我们为什么要去分词呢,这和搜索引擎有什么关系呢?我们在搜索框里输入的几个词或者一段话是如何拆成多个关键字的呢?
大家听说过哪些分词器吗?比如lucene自带的中文分词器smartcn,还有最常用的IK分词器等等,今天我们主要讲一下IK分词器。
2.1 IK分词器
IK分词器首先会维护几个词典来记录一些常用的词,如主词表:main2012.dic、量词表quantifier.dic、停用词stopword.dic。
Dictionary为字典管理类中,分别加载了这个词典到内存结构中。具体的字典代码,位于org.wltea.analyzer.dic.DictSegment。 这个类实现了一个分词器的一个核心数据结构,即Tire Tree。
Tire Tree(字典树)是一种结构相当简单的树型结构,用于构建词典,通过前缀字符逐一比较对方式,快速查找词,所以有时也称为前缀树。具体的例子如下。
举例
比如:我是北京海淀区中关村的中国人民。
我们设置的词典是:北京、海淀区、中关村、中国、中国人民,那么根据词典组成的字典树如图所示:
然后我们根据这个字典树来对这段话进行词语切分。IK分词器中,基本可以分为两种模式:一种是smart模式、一种是非smart模式,可以在代码中初始化的时候去配置。
我们其实不用解释这两种模式的字面含义,直接打印两种模式的结果就可以看出来: 原句:我是北京海淀区中关村的中国人民 smart模式:北京、海淀区、中关村、中国人民 非smart模式:北京、海淀区、中关村、中国、中国人民
显而易见,非smart模式是将能够分出来的词全部输出;smart模式是根据内在的方法输出一个合理的分词结果,这就涉及到了歧义判断。
举例
举个更有代表性的例子:张三说的确实在理。
根据正向匹配可能的词元链: L1:{张三,张,三} L2:{说} L3:{的确,的,确实,确,实在,实,在理,在,理}
首来看一下最基本的一些元素结构类: public class Lexeme implements Comparable{ …… //词元的起始位移 private int offset; //词元的相对起始位置 private int begin; //词元的长度 private int length; //词元文本 private String lexemeText; //词元类型 private int lexemeType; …… }
这里的Lexeme(词元),可以理解为是一个词语或单词。其中的begin,是指其在输入文本中的位置。注意,它是实现Comparable的,起始位置靠前的优先,长度较长的优先,这可以用来决定一个词在一条分词结果的词元链中的位置,可以用于得到上面例子中分词结果中的各个词的顺序。   /* * 词元在排序集合中的比较算法 * @see java.lang.Comparable#compareTo(java.lang.Object) */ public int compareTo(Lexeme other) { //起始位置优先 if(this.begin < other.getBegin()){ return -1; }else if(this.begin == other.getBegin()){ //词元长度优先 if(this.length > other.getLength()){ return -1; }else if(this.length == other.getLength()){ return 0; }else {//this.length < other.getLength() return 1; } }else{//this.begin > other.getBegin() return 1; } }
smart模式歧义消除算法:   词元位置权重比较(词元长度积),含义是选取长的词元位置在后的集合 L31:{的,确实,在理}1*1+2*2+3*2=11 L32:{的确,实,在理} 1*2+2*1+3*2=10 L33:{的确,实在,理} 1*2+2*2+3*1=9 最后的分词结果:张三,说,的,确实,在理
分词就介绍这么多,大家可以去读一下IK分词器的源码,深入地了解一下,源码地址: https://github.com/quentinxxz/Search/tree/master/IKAnalyzer2012FF_hf1_source/
三、倒排索引算法
3.1 介绍
我们可以把倒排索引算法想象成查字典时的目录一样,我们知道需要查的字的目录后,就会很快地查找到。如果用专业的语言解释的话就是: 倒排索引源于实际应用中需要根据属性的值来查找记录。这种索引表中的每一项都包括一个属性值和具有该属性值的各记录的地址。由于不是由记录来确定属性值,而是由属性值来确定记录的位置,因而称为倒排索引(inverted index)。带有倒排索引的文件我们称为倒排索引文件,简称倒排文件(inverted file)。
倒排文件(倒排索引),索引对象是文档或者文档集合中的单词等,用来存储这些单词在一个文档或者一组文档中的存储位置,是对文档或者文档集合的一种最常用的索引机制。
搜索引擎的关键步骤就是建立倒排索引,倒排索引一般表示为一个关键词,然后是它的频度(出现的次数),位置(出现在哪一篇文章或网页中,及有关的日期,作者等信息),它相当于为互联网上几千亿页网页做了一个索引,好比一本书的目录、标签一般。读者想看哪一个主题相关的章节,直接根据目录即可找到相关的页面。不必再从书的第一页到最后一页,一页一页地查找。
3.2 Lucene倒排索引原理
Lucerne是一个开放源代码的高性能的基于java的全文检索引擎工具包,不是一个完整的全文检索引擎,而是一个全文检索引擎的架构,提供了完整的查询引擎和索引引擎,部分文本分析引擎。目的是为软件开发人员提供一个简单易用的工具包,以方便在目标系统中实现全文检索的功能,或者以此为基础建立起完整的全文检索引擎。
假设有两篇文章1和2:
文章1的内容为: Jack lives in BeiJing,I live in BeiJing too.    文章2的内容为: He lived in Taiyuan.
1)取得关键词  
首先我们要用我们之前讲的方式分词,然后由于英文的原因,我们需要将in、on、of这些没用实际意义的词过滤掉,然后将第三人称单数加s或者过去式加ed这些词还原回去,如lived变回live,lives变回live,然后把不需要的标点符号也去掉。经过上面的处理之后,剩下的关键字为:
文章1的所有关键词为:[Jack] [live] [BeiJing] [i] [live] [BeiJing]    
文章2的所有关键词为:[he] [live] [Taiyuan]
2)建立倒排索引 关键词 文章号[出现频率] 出现位置    BeiJing 1[2] 3,6    he 2[1] 1    i 1[1] 4    live 1[2] 2,5, 2[1] 2    Taiyuan 2[1] 3    tom 1[1] 1
  以上就是lucene索引结构中最核心的部分。我们注意到关键字是按字符顺序排列的(lucene没有使用B树结构),因此lucene可以用二元搜索算法快速定位关键词。  
3.3 实现
实现时,lucene将上面三列分别作为词典文件(Term Dictionary)、频率文件(frequencies)、位置文件 (positions)保存。其中词典文件不仅保存有每个关键词,还保留了指向频率文件和位置文件的指针,通过指针可以找到该关键字的频率信息和位置信息。  
3.4 压缩算法
为了减小索引文件的大小,Lucene对索引还使用了压缩技术。
首先,对词典文件中的关键词进行了压缩,关键词压缩为<前缀长度,后缀>,例如:当前词为“阿拉伯语”,上一个词为“阿拉伯”,那么“阿拉伯语”压缩为<3,语>。
其次大量用到的是对数字的压缩,数字只保存与上一个值的差值(这样可以减小数字的长度,进而减少保存该数字需要的字节数)。例如当前文章号是16389(不压缩要用3个字节保存),上一文章号是16382,压缩后保存7(只用一个字节)。
3.5 使用原因
假设要查询单词 “live”,lucene先对词典二元查找、找到该词,通过指向频率文件的指针读出所有文章号,然后返回结果。词典通常非常小,因而,整个过程的时间是毫秒级的。   
而用普通的顺序匹配算法,不建索引,而是对所有文章的内容进行字符串匹配,这个过程将会相当缓慢,当文章数目很大时,时间往往是无法忍受的。
四、solr基本配置以及使用
我们在windows系统中安装solr。
下载地址 http://lucene.apache.org/solr/downloads.html
解压后:
cmd 进入solr的bin目录,使用命令 solr start(为了更方便,可以配置solr的环境变量,配好后可以直接在cmd中使用solr命名)
看到这个界面,说明solr服务启动成功,端口号为 8983,访问 http://localhost:8983,会自动跳转到http://localhost:8983/solr/#/
在这个页面会显示 solr的基本信息,lucene信息,Java信息等
然后我们介绍一个solr的指令: solr -h 可以看到solr的基本信息
配置solr
配置核心core
solr create -c mycore -d baisc_configs:-c参数指定定义的核心名称,-d参数指定配置目录
执行该命令后,会多出一个test目录。
访问Solr Admin页面: http://localhost:8983/,查看core,多了一个 test
在\solr-6.3.0\server\solr\test目录下有conf和data目录,分别对应配置和数据。
给core添加数据
打开目录:\solr-6.3.0\server\solr\test\conf,添加一个字段:
然后重启solr: solr restart -p 8983
到Solr Admin页面,选择core-test-document,在Document(s)中填写数据: { "id":"1", "name":"宝马" }
点击submit,返回Status: success,则代表添加数据成功。
多加几条后,点击Query,查询数据:
查询界面的 q,代表 查询条件,如输入:name:"宝马",再次执行查询
也可以直接get方式访问url: http://localhost:8983/solr/test/select?q=name:宝马
作者:杨亨
来源:宜信技术学院
大数据
2019-09-05 14:07:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>> 引言:十年沉淀、全球宽表排名第一、阿里云首发云Cassandra服务ApsaraDB for
Cassandra是基于开源Apache Cassandra,融合阿里云数据库DBaaS能力的分布式NoSQL数据库。Cassandra已有10年+的沉淀,基于Amazon DynamoDB的分布式设计和 Google Bigtable 的数据模型。具备诸多优异特性:采用分布式架构、无中心、支持多活、弹性可扩展、高可用、容错、一致性可调、提供类SQL查询语言CQL等。Cassandra为互联网业务而生,已在全球广大互联网公司有成熟应用,是目前最流行的宽表数据库。阿里云在2019年8月份全球首发云Cassandra服务。 宽表市场排名: 排名第一。且 Datastax、ScyllaDB、CosmosDB都兼容CQL,已成事实标准:
10+年Cassandra发展历程:
选择阿里云Cassandra服务的几大理由:
理由一:Cassandra为互联网业务而生
如果业务系统有事务的要求,我们建议使用MySQL或者阿里云POLARDB数据库,其提供的事务特性及SQL能力更匹配BOSS、CRM、ERP等业务需求。
而对于互联网业务有如下特点:极致在线、高并发、大容量、可调的一致性、灵活扩展,MySQL并非最佳选择,Cassandra应运而生。 极致在线:支持多机房部署,单节点/单机房故障时,业务不中断。 扩展性强:支持从160GB到10PB的容量,支持从数千QPS到数千万的QPS,支持实例规格扩展,支持从单节点到多节点,支持从单机房到多机房,还将提供从同城到异地等多种扩展能力。匹配业务系统从小到大持续发展的特点,既满足不同发展时期对性能、成本、可靠性、灾备等不同要求,又可保障业务的平滑演进及连续性。 可调一致性:支持多种读写一致性级别,用户可依据自身业务系统特点灵活配置。比如对于物联网等业务,可以放宽一致性要求以获取更高的性能;对一致性要求较高的业务,则可选择QUORUM等强一致性级别。
如果您有这样的业务:聊天消息、新闻推送、历史订单、对象存储、购物车、计费系统、评论等,以及数据驱动的业务如风控、推荐、用户画像、物联网、日志分析等,阿里云Cassandra数据库是一个极好的选择。
理由二:开发者&DBA容易上手
一个熟悉MySQL的开发者,可以在半天内完全掌握Cassandra用法;一个熟悉MySQL的DBA,可以在一天内掌控Cassandra。Cassandra是一个易用易运维的数据库,有如下优点: CQL借鉴SQL语法,方便MySQL及传统数据库开发者直接使用 支持安全认证、SSL及完整鉴权体系,让DBA更省心 商业化前会支持备份恢复的能力,让DBA更放心 支持完整的索引服务,比如localindex、MV视图、SASI全文索引,方便开发者使用 原生提供各类语言的客户端:Java、Python、PHP、.Net、Nodejs等原生客户端(非thrift模式)连接Cassandra,性能跟JAVA客户端一致
理由三:大数据存储的利器、支持搭配X-Pack Spark
Spark是目前主流默认的大数据处理引擎,Cassandra是目前主流的BigData NoSQL数据库。Cassandra+Spark可以处理风控、推荐、物联网等很多数据驱动的业务。阿里云数据库团队也提供了X-Pack Spark引擎支持Cassandra,满足用户诉求。
理由四:阿里云数据库团队保驾护航、承诺7*24小时保障稳定性及安全
ApsaraDB for Cassandra是阿里云数据库团队精心构建的云服务,在商业化之前会原生支持同城多机房、备份恢复服务。阿里云数据库Cassandra团队承诺,会7*24小时保障您数据库的稳定性及安全,会不断改进性能,提供贴合业务的功能,为您的业务保驾护航。
本文作者:ApsaraDB Cassandra
原文链接
本文为云栖社区原创内容,未经允许不得转载。
大数据
2019-09-04 15:49:00
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
今天测试pig的一个小用法需要加载本地文件,又是第一次用pig,结果本地文件读取不到,后经搜索得到需要在pig命令行模式下加载本地根目录: grunt> ls /
这样,重新运行,就可以得到本地文件了
附上解决方案来源: https://www.cnblogs.com/jamesf/p/4751606.html
大数据
2019-07-29 22:27:00