数据专栏

智能大数据搬运工,你想要的我们都有

科技资讯:

科技学院:

科技百科:

科技书籍:

网站大全:

软件大全:

「深度学习福利」大神带你进阶工程师,立即查看>>>
虚拟机以及Linux系统安装在之前的两篇分享中已经详细的介绍了方法,并且每一步的都配图了。如果有朋友还是看不懂,那我也爱莫能助了。本篇主要就hadoop服务器操作系统配置进行详细说明,hadoop安装会在下一篇文章中详细的介绍。 hadoop安装包用的是大快DKHadoop发行版,个人觉得DKHadoop的安装过程是比较简单,关于dkhadoop的安装在下一次的分享中再详细介绍吧。下面进入到本篇的主题——服务器操作系统配置教程 一、安装包准备
1、虚拟机分布式安装(三台及以上虚拟机) 若是个人电脑Windows系统或Linux系统,虚拟机上虚拟的三台服务器,则将安装包拷贝到服务器上,进行安装操作即可。 2、物理集群(三台及以上实体服务器) 1、外网下载模式 2、本地文件模式 将安装包文件直接拷贝到实体服务器root目录下,进行安装操作即可。 3、上传模式 当服务器在机房,且无显示和输入设备的情况下,应用该模式。 现将文件,安装在本地计算机上(默认为笔记本电脑,在机房现场),并将计算机与服务器连接,将安装包install、DKHInstall上传到服务器root目录下。
二、服务器操作系统配置教程
准备工作完成之后,就要进行服务器配置操作,首先必须三台服务器之间要互相ping通。所谓ping通,就是两台设备之间网络是通的。从一端发送一个数据包,另一端就能够收到,就代表两台设备是能够ping通了。 1、修改权限 目的:使install、DKHInstall两个安装包有可执行的权限。权限不足无法执行一些操作。install里面是脚本和所有组件,DKHInstall里面是安装界面。 步骤:在准备工作中拷贝安装包 install、DKHInstall到主节点目录后,修改文件权限。首先进入root目录,安装目录 install、DKHInstall的权限改为:文件所有者可读可写可执行,与文件所有者属于一个用户组的其他用户可读可执行,其他用户组可读可执行。 命令: cd /root/ unzip DKHPlantform.zip chmod -R 755 DKHPlantform
2、 搭建Hadoop集群设置SSH免密登录
目的:Hadoop运行过程中需要管理远端Hadoop守护进程,在Hadoop启动以后,NameNode是通过SSH(Secure Shell)来启动和停止各个DataNode上的各种守护进程的。这就必须在节点之间执行指令的时候是不需要输入密码的形式,所以我们需要配置SSH运用无密码公钥认证的形式,这样NameNode使用SSH无密码登录并启动DataName进程,同样原理,DataNode上也能使用SSH无密码登录到NameNode。 步骤: (1)修改本机hosts文件,写入对应关系 为了区分局域网内的每台主机,都会给主机配一个主机名,每台主机之间又是通过IP进行通信,但IP地址不方便记忆,所以配置主机名和IP映射能够实现主机之间的快速方便的访问。 命令: vi /etc/hosts 通过按键盘上的insert或是I键进入编辑模式,编辑完成后按一下Esc键然后按一下按Shift+: 键,输入wq,后回车就可以保存。输入q!后回车则是放弃保存并退出. 进入编辑模式后,按照规则写入主机与ip的对应关系(主机名称dk41是自己命名的,如下图)例: 192.168.1.41 dk41 192.168.1.42 dk42 192.168.1.43 dk43 编辑完后,保存退出。把对应关系拷到其他两台或多台机器上。 命令: scp  -r  /etc/hosts  192.168.1.42:/etc scp  -r  /etc/hosts  192.168.1.43:/etc (2)执行集群之间免密前的准备工作
执行sshpass.sh的脚本的时候会去读sshhosts和sshslaves这两个文件,替换sshpass.sh中的master和slave文件。 修改文件sshhosts,输入全部机器的主机名,每行一个主机名(如下图) 命令: vi /root/DKHPlantform/autossh/sshhosts 通过按键盘上的insert或是I键进入编辑模式,编辑完成后按一下Esc键然后按一下按Shift+:键,输入wq,后回车就可以保存。输入q!后回车则是放弃保存并退出.
修改文件sshslaves,写入除主机名之外的所有机器名(如下图) 命令: vi /root/DKHPlantform/autossh/sshslaves 通过按键盘上的insert或是I键进入编辑模式,编辑完成后按一下Esc键然后按一下按Shift+:键,输入wq,后回车就可以保存。输入q!后回车则是放弃保存并退出.
(3)执行集群免密工作
命令: cd /root/DKHPlantform/autossh ./autossh 主节点主机名 集群密码 例: ./autossh dk41 123456 (4)关闭防火墙 防止访问服务器时某些服务被拦截,需关闭防火墙。 命令: cd /root/DKHPlantform/autossh ./offIptables.sh
3、 安装双机热备份的MySQL
目的:存放Hive的元数据 步骤: (1)从主节点分发mySQL安装目录到第二节点 命令: scp -r /root/DKHPlantform/mysqlInst/ 192.168.1.42:/root/ (2)主节点执行: 命令: cd /root/DKHPlantform/mysqlInst/ ./mysql.sh 1 从节点执行: 命令: cd /root/mysqlInst/ ./mysql.sh 2
(3)执行成功之后执行热备份(两台机器上都要执行,两个ip互换,41上写42,42上写41,密码是MySQL的密码为:123456。平台内已经设定好,请勿修改): 命令: source /etc/profile ./sync.sh 192.168.1.xxx(另一台mysql地址) 4、 创建数据库
目的:MySQL是一种关联数据库管理系统,关联数据库将数据保存在不同的表中,增加了速度并提高了灵活性。 步骤: (1)导入MySQL数据表,只在主节点执行: 命令: mysql -uroot -p123456 < { 此处为sql文件,主目录下文件: dkh.sql} 如:mysql -uroot -p123456 5、启动安装
目的:服务器配置操作完成之后,启动DKH。 步骤:执行以下命令。 命令: cd /root/DKHPlantform/dkh-tomcat*/bin/ ./startup.sh 6、本地时间服务器搭建步骤 没联网或者装系统时时间未同步,需要搭建本地时间服务器。 (1)搭建内网的ntp服务器 修改/etc/ntp.conf 命令: Vim /etc/ntp.conf 通过按键盘上的insert或是I键进入编辑模式,编辑完成后按一下Esc键然后按一下按Shift+:键,输入wq后回车就可以保存。输入q!后回车则是放弃保存并退出. 修改下列三行: #server 0.centos.pool.ntp.org   #server 1.centos.pool.ntp.org   #server 2.centos.pool.ntp.org    在文件最后添加下列两行: server  127.127.1.0    fudge   127.127.1.0 stratum 10 (2)启动ntp服务 service ntpd start (3)开机自动启动 chkconfig ntpd on (4)客户端同步时间 命令: Vim /etc/ntp.conf 通过按键盘上的insert或是I键进入编辑模式,编辑完成后按一下Esc键然后按一下按Shift+:键,输入wq,后回车就可以保存。输入q!后回车则是放弃保存并退出. 在文件最后添加一行: */15 * * * * root ntpdate 192.168.27.35;hwclock -w 
大数据
2018-08-01 13:06:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
hive 语句执行顺序 大致顺序 from... where.... select...group by... having ... order by...
explain查看执行计划
hive语句和mysql都可以通过explain查看执行计划,这样就可以查看执行顺序,比如 explain select city,ad_type,device,sum(cnt) as cnt from tb_pmp_raw_log_basic_analysis where day = '2016-05-28' and type = 0 and media = 'sohu' and (deal_id = '' or deal_id = '-' or deal_id is NULL) group by city,ad_type,device
显示执行计划如下 STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 is a root stage STAGE PLANS: Stage: Stage-1 Map Reduce Map Operator Tree: TableScan alias: tb_pmp_raw_log_basic_analysis Statistics: Num rows: 8195357 Data size: 580058024 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (((deal_id = '') or (deal_id = '-')) or deal_id is null) (type: boolean) Statistics: Num rows: 8195357 Data size: 580058024 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: city (type: string), ad_type (type: string), device (type: string), cnt (type: bigint) outputColumnNames: city, ad_type, device, cnt Statistics: Num rows: 8195357 Data size: 580058024 Basic stats: COMPLETE Column stats: NONE Group By Operator aggregations: sum(cnt) keys: city (type: string), ad_type (type: string), device (type: string) mode: hash outputColumnNames: _col0, _col1, _col2, _col3 Statistics: Num rows: 8195357 Data size: 580058024 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string) sort order: +++ Map-reduce partition columns: _col0 (type: string), _col1 (type: string), _col2 (type: string) Statistics: Num rows: 8195357 Data size: 580058024 Basic stats: COMPLETE Column stats: NONE value expressions: _col3 (type: bigint) Reduce Operator Tree: Group By Operator aggregations: sum(VALUE._col0) keys: KEY._col0 (type: string), KEY._col1 (type: string), KEY._col2 (type: string) mode: mergepartial outputColumnNames: _col0, _col1, _col2, _col3 Statistics: Num rows: 4097678 Data size: 290028976 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: bigint) outputColumnNames: _col0, _col1, _col2, _col3 Statistics: Num rows: 4097678 Data size: 290028976 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 4097678 Data size: 290028976 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator limit: -1
具体介绍如下 **stage1的map阶段** TableScan:from加载表,描述中有行数和大小等 Filter Operator:where过滤条件筛选数据,描述有具体筛选条件和行数、大小等 Select Operator:筛选列,描述中有列名、类型,输出类型、大小等。 Group By Operator:分组,描述了分组后需要计算的函数,keys描述用于分组的列,outputColumnNames为输出的列名,可以看出列默认使用固定的别名_col0,以及其他信息 Reduce Output Operator:map端本地的reduce,进行本地的计算,然后按列映射到对应的reduce **stage1的reduce阶段Reduce Operator Tree** Group By Operator:总体分组,并按函数计算。map计算后的结果在reduce端的合并。描述类似。mode: mergepartial是说合并map的计算结果。map端是hash映射分组 Select Operator:最后过滤列用于输出结果 File Output Operator:输出结果到临时文件中,描述介绍了压缩格式、输出文件格式。 stage0第二阶段没有,这里可以实现limit 100的操作。
总结 1,每个stage都是一个独立的MR,复杂的hql语句可以产生多个stage,可以通过执行计划的描述,看看具体步骤是什么。 2,执行计划有时预测数据量,不是真实运行,可能不准确
大数据
2018-07-31 20:03:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
一、sqoop1.4.6 to hbase1.2 in hue4.1
CM安装 cdh 5.15.x, 因想用简单的sqoop1所以parce装的sqoop1 client,经测试报:SQOOP_CONF_DIR找不到,mysql drive no find ,检查发现安装目录和classpath都有,依然如此,后找文档说是cdh只支持sqoop2,sqoop1需要手配置 oozie/libext并上传到hdfs下,链接https://www.cloudera.com/documentation/enterprise/latest/topics/cdh_oozie_sqoop_jdbc.html,翻译如下 :
在Oozie中使用Sqoop动作
Sqoop 1不附带第三方JDBC驱动程序。您必须单独下载它们并将它们保存到在/ var / lib中/ sqoop /Oozie服务器上的目录。有关更多信息,请参阅 使用命令行设置Apache Sqoop 。

建议 Cloudera建议您不要将Sqoop CLI命令与Oozie Shell Action一起使用。此类部署不可靠,并且在升级和配置更改期间容易中断。 要将数据导入Hive,请使用Sqoop Action和Hive2 Action的组合。 SQoop Action简单地将数据摄入HDFS。 Hive2 Action将数据从HDFS加载到Hive中。
部署和配置Oozie Sqoop1操作JDBC驱动程序
在开始此过程之前,请确认您的Sqoop1 JDBC驱动程序是否存在 在/ var / lib中/ sqoop。
SSH到Oozie服务器主机并执行以下命令在HDFS上部署和配置驱动程序: cd / var / lib / sqoop sudo -u hdfs hdfs dfs -mkdir / user / oozie / libext sudo -u hdfs hdfs dfs -chown oozie:oozie / user / oozie / libext sudo -u hdfs hdfs dfs -put /opt/cloudera/parcels/SQOOP_NETEZZA_CONNECTOR/sqoop-nz-connector*.jar / user / oozie / libext / sudo -u hdfs hdfs dfs -put /opt/cloudera/parcels/SQOOP_TERADATA_CONNECTOR/lib/*.jar / user / oozie / libext / sudo -u hdfs hdfs dfs -put /opt/cloudera/parcels/SQOOP_TERADATA_CONNECTOR/sqoop-connector-teradata*.jar / user / oozie / libext / sudo -u hdfs hdfs dfs -put /var/lib/sqoop/*.jar / user / oozie / libext / sudo -u hdfs hdfs dfs -chown oozie:oozie /user/oozie/libext/*.jar sudo -u hdfs hdfs dfs -chmod 755 /user/oozie/libext/*.jar sudo -u hdfs hdfs dfs -ls / user / oozie / libext #[/ user / oozie / libext的样本内容] -rwxr-xr-x 3 oozie oozie 959987 2016-05-29 09:58 /user/oozie/libext/mysql-connector-java.jar -rwxr-xr-x 3 oozie oozie 358437 2016-05-29 09:58 /user/oozie/libext/nzjdbc3.jar -rwxr-xr-x 3 oozie oozie 2739670 2016-05-29 09:58 /user/oozie/libext/ojdbc6.jar -rwxr-xr-x 3 oozie oozie 3973162 2016-05-29 09:58 /user/oozie/libext/sqoop-connector-teradata-1.5c5.jar -rwxr-xr-x 3 oozie oozie 41691 2016-05-29 09:58 /user/oozie/libext/sqoop-nz-connector-1.3c5.jar -rwxr-xr-x 3 oozie oozie 2405 2016-05-29 09:58 /user/oozie/libext/tdgssconfig.jar -rwxr-xr-x 3 oozie oozie 873860 2016-05-29 09:58 /user/oozie/libext/terajdbc4.jar
配置Oozie Sqoop1操作工作流JDBC驱动程序
使用以下步骤配置Oozie Sqoop1操作工作流: 确认HDFS中存在Sqoop1 JDBC驱动程序。为此,请通过SSH连接到Oozie Server主机并运行以下命令: sudo -u hdfs hdfs dfs -ls / user / oozie / libext 在Oozie中配置以下Oozie Sqoop1 Action工作流变量 job.properties 文件如下: oozie.use.system.libpath = true oozie.libpath = / user / oozie / libext
然而并无卵用,后国外社区 上有说driver传hdfs://usr/share/lib/sqoop下,试了下, SQOOP_CONF_DIR依然找不到,mysql drive no find 解决了但不能用sql user name 登录,找不到用户,而这一切在命令行下完全正常,报错如下: Sqoop command arguments : list-databases --connect jdbc:mysql://172.16.12.44:3306/ --username bigdata_ro --password ******** Fetching child yarn jobs tag id : oozie-ed23d117051393fc0989abc9110ba08d 2018-07-31 18:58:46,734 [main] INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at master/172.16.60.151:8032 Child yarn jobs are found - ================================================================= >>> Invoking Sqoop command line now >>> 2018-07-31 18:58:46,957 [main] WARN org.apache.sqoop.tool.SqoopTool - $SQOOP_CONF_DIR has not been set in the environment. Cannot check for additional configuration. 2018-07-31 18:58:47,013 [main] INFO org.apache.sqoop.Sqoop - Running Sqoop version: 1.4.6-cdh5.15.0 2018-07-31 18:58:47,027 [main] WARN org.apache.sqoop.tool.BaseSqoopTool - Setting your password on the command-line is insecure. Consider using -P instead. 2018-07-31 18:58:47,039 [main] WARN org.apache.sqoop.ConnFactory - $SQOOP_CONF_DIR has not been set in the environment. Cannot check for additional configuration. 2018-07-31 18:58:47,124 [main] INFO org.apache.sqoop.manager.MySQLManager - Preparing to use a MySQL streaming resultset. 2018-07-31 18:58:47,467 [main] ERROR org.apache.sqoop.manager.CatalogQueryManager - Failed to list databases java.sql.SQLException: Access denied for user 'bigdata_ro'@'172.16.60.149' (using password: YES) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1078) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4187) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4119) .... <<< Invocation of Sqoop command completed <<< No child hadoop job is executed. Intercepting System.exit(1) <<< Invocation of Main class completed <<< Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SqoopMain], exit code [1] Oozie Launcher failed, finishing Hadoop job gracefully

至此检查所有相关配置文件发现,hue sqoop根本没走 sqooq脚本,所以所有设置无效,而是走oozie的一个类:org.apache.oozie.action.hadoop.SqoopMain,所以可能Hue里只支持sqoop2,帮暂时搁置,先导入数据 以后研究,,,,,,,
二、sqoop1.4.6 to hbase1.2 in shell:
# sqoop list-databases --connect jdbc:mysql://172.16.12.44:3306/che100 --username bigdata_ro --password *******
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
18/07/31 19:15:15 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.15.0
18/07/31 19:15:15 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
18/07/31 19:15:15 INFO teradata.TeradataManagerFactory: Loaded connector factory for 'Cloudera Connector Powered by Teradata' on version 1.7c5
18/07/31 19:15:15 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
information_schema
admin
......
# sqoop import --connect jdbc:mysql://172.16.12.44:3306/che100 --username bigdata_ro --password ******* --table tc_district --hbase-table tc_district --column-family f1 --hbase-row-key id --check-column update_dt --incremental lastmodified --last-value 0 -m 1
# hbase shell
hbase(main):001:0> list
TABLE
tc_district
1 row(s) in 0.2410 seconds
hue hive建外表关联hbase表并刷新结构到impala :
CREATE EXTERNAL TABLE che100.tc_district(
id INT,
dname STRING,
pid INT,
update_dt TIMESTAMP)
ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key, f1:dname, f1:pid, f1:update_dt") //注意 这里key 是默认的不能用导入 的rwokey 字段 id,提示多一列,其它注意数据类型转化,INT ,TINYINT,BOOLEAN, TIMESTAMP,decimal(11,6)
TBLPROPERTIES("hbase.table.name" = "tc_district");
hue impala 刷新后找到相关表后查询测试:
INVALIDATE METADATA
SELECT * FROM `che100`.`tc_district` LIMIT 100
至此一切正常,但想从hbase browse里看hbase数据,表都刷不出来:文档得知为了安全,hbase thrift 不允许直接对外访问,因为这样所有人知道你的thrift 地址都可以直接链的hbase服务器了:https://www.cloudera.com/documentation/enterprise/latest/topics/cdh_sg_hbase_security.html翻译如下:
想学习如何自定义权限认证,请参考过往记忆的博文: HiveServer2(Spark ThriftServer)自定义权限认证
HBase身份验证
要配置HBase安全性,请完成以下任务: 配置HBase身份验证 :您必须为HBase服务器和客户端建立一种机制,以便使用HDFS,ZooKeeper和彼此安全地识别自己。这可以确保主机是他们声称的主机。 注意: 要使HBase能够使用Kerberos安全性,必须执行 在CDH 5 和 ZooKeeper安全配置 中 配置Hadoop安全性中 的安装和配置步骤。 虽然HBase Thrift服务器可以连接到安全的Hadoop集群,但是从客户端到HBase Thrift服务器的访问不受保护。要加密客户端与HBase Thrift服务器之间的通信,请参阅 为HBase Thrift服务器配置TLS / SSL 。 以下部分描述了如何将Apache HBase和CDH 5与Kerberos安全性一起使用: 为HBase配置Kerberos身份验证 配置安全HBase复制 配置HBase客户端TGT续订周期 配置HBase授权 :您必须为允许客户端访问的资源建立规则。有关更多信息,请参阅 配置HBase授权 。

使用Hue HBase App
Hue包含一个 HBase应用程序 ,允许您通过Thrift代理服务器与HBase交互。因为Hue位于Thrift服务器和客户端之间,所以Thrift服务器假定所有HBase操作都来自色调用户而不是客户。确保只允许Hue中的用户执行分配给他们自己的凭据的HBase操作,而不是那些色调用户,必须启用 HBase模拟 。
Hue团队 发布于2015年3月25日, HBase , Video
38评论
Hue附带了一个 HBase应用程序 ,可以让您创建表格,搜索行,阅读单元格内容......只需点击几下。我们现在很高兴发布最后一个缺失的安全性(在即将到来的Hue 3.8中可用),以便让应用程序生产准备就绪!


HBase应用程序通过代理服务器(称为 Thrift Server V1 )与HBase通信,代理服务器将命令转发给HBase。因为Hue位于代理服务器和实际用户之间,所以代理服务器认为所有操作(例如创建表,扫描某些数据......)来自'hue'用户而不是实际的Web用户。这显然不是很安全!
为了确保HBase应用程序真正安全,我们需要: 确保Hue中的实际登录用户使用其权限执行操作。这是假冒的工作。 确保Hue服务器仅发送这些调用。这是Kerberos强身份验证的工作。

注意
我们假设您已在群集中安装了HBase Thrift Server。如果使用Cloudera Manager,请转到HBase服务实例列表,然后单击“添加角色实例”并选择“HBase Thrift Server”。

模拟
现在可以将HBase配置为 提供模拟 (使用或不使用Kerberos)。在我们的例子中,这意味着用户可以通过Hue向HBase发送命令,而不会失去他们将在自己的凭据(而不是'hue'用户)下运行的事实。
首先,确保在hbase-site.xml中有这个:
1
2
3
4

6
7
8
9
< property >
< name >hbase.thrift.support.proxyuser
< value >true


< property >
< name >hbase.regionserver.thrift.http
< value >true


注意
如果使用Cloudera Manager,可以通过在HBase服务的配置搜索中键入“thrift”并检查前两个结果来完成。

然后在core-site.xml中检查HBase是否有权冒充某人:
1
2
3
4

6
7
8
9
< property >
< name >hadoop.proxyuser.hbase.hosts
< value >*


< property >
< name >hadoop.proxyuser.hbase.groups
< value >*


最后检查Hue是否指向其hue.ini中指定的HBase的本地配置目录:
1
2
[hbase]
hbase_conf_dir= /etc/hbase/conf

注意
如果您使用的是Cloudera Manager,则可能需要在Hue配置中选择HBase Thrift服务器,并在hue_safety_valve.ini的Hue服务高级配置代码段(安全阀)中输入类似的内容。
1
2
[hbase]
hbase_conf_dir={{HBASE_CONF_DIR}}

就是这样,启动HBase Thrift Server和Hue,你就准备好了!

Kerberos的安全性
现在Hue可以向HBase Thrift Server发送命令并告诉他以某个用户身份执行它们,我们需要确保只允许Hue执行此操作。我们使用Kerberos来强烈验证HBase服务的用户。在我们的例子中,HBase Thrift服务器只有在来自Hue用户时才会接受命令。
确保HBase配置了 Kerberos ,并且您在Hue指向的hbase-site.xml中具有此功能:

1
2
3
4

6
7
8
9
< property >
< name >hbase.security.authentication
< value >KERBEROS


< property >
< name >hbase.thrift.kerberos.principal
< value >hbase/_HOST@ENT.CLOUDERA.COM




注意
如果在没有模拟的情况下使用Cloudera Manager或常规Thrift,请确保设置“HBase Thrift Authentication”hbase.thrift.security.qop必须设置为以下之一: auth-conf:身份验证,完整性和机密性检查 auth-int:身份验证和完整性检查 auth:仅限身份验证
如果使用Cloudera Manager,请转至“Hbase服务>配置>服务范围/安全:HBase Thrift身份验证”,然后选择以下三个选项之一。
与上面类似,请确保hue.ini指向带有hbase-site.xml的有效目录:
1
2
[hbase]
hbase_conf_dir= /etc/hbase/conf
要么
1
2
[hbase]
hbase_conf_dir={{HBASE_CONF_DIR}}

注意
如果使用模拟,请确保HTTP / _HOST主体位于其HBase Thrift服务器的密钥表中。

重启HBase和Hue,它们现在应该全部安全!

结论
您现在可以确定Hue用户只能查看或修改HBase级别允许的内容。Hue保证如果用户不能在HBase shell中执行某个操作,它将通过Hue完全相同(Hue就像HBase顶部的'view')。
请注意,HBase选择通过 HTTP Thrift 支持模拟,因此在使用模拟时常规Thrift不起作用。之前的Kerberos支持现在也很有意义,因为所有操作都不再来自Hue用户了!只需点击一下即可完成所有这些配置步骤。

现在是时候玩表示 例 并打开HBase给所有用户了!


像往常一样随意评论 色调用户 列表或 @gethue !

注意
此错误意味着上面的'hadoop.proxyuser.hbase.hosts'/'hadoop.proxyuser.hbase.groups'属性不正确:
1
2
3
Api Error: Error 500 User: hbase is not allowed to impersonate romain HTTP ERROR 500 Problem accessing /.
Reason: User: hbase is not allowed to impersonate bob Caused by:javax.servlet.ServletException:
User: hbase is not allowed to impersonate bob at org.apache.hadoop.hbase.thrift.ThriftHttpServlet.doPost(ThriftHttpServlet.java:117) at
注意
您现在可能会看到如下所示的权限错误。
1
Api Error: org.apache.hadoop.hbase.security.AccessDeniedException: Insufficient permissions (user=admin, scope=default, action=CREATE)...
这是因为: 您正在使用模拟,而您的用户'bob'没有足够的HBase权限 您没有使用模拟,而'hue'用户没有足够的HBase权限

解决此问题的一种快速方法是只提供所有权限。显然,这不建议用于真正的设置,而是阅读有关 HBase访问控制的 更多信息!
1
2
3
sudo -u hbase hbase shell

hbase(main):004:0> grant 'bob' , 'RWC'

注意
如果你得到一个“Api错误:TSocket读取0字节”,这是因为Hue不知道Thrift服务器正在期待Thrift HTTP。仔细检查Hue是否指向包含hbase.regionserver.thrift.http属性设置为true的hbase-site.xml。
暂时的黑客将是在hue.ini中插入:
1
2
[hbase]
use_doas= true

注意
“Api错误:超出最大递归深度”表示HBase Thrift服务器未作为HTTP Kerberos服务运行。
在最新的Hue 3.8中,您现在应该只得到401错误。

注意
使用模拟时未测试缓冲传输模式,但可能有效。

注意
如果您收到此错误:
1
Caused by: org.apache.hadoop.hbase.thrift.HttpAuthenticationException: Authorization header received from the client is empty.
您很可能会点击 https://issues.apache.org/jira/browse/HBASE-13069 。还要确保HTTP / _HOST主体位于其HBase Thrift Server的keytab中。请注意,作为后续行动,您可能会获得 https://issues.apache.org/jira/browse/HBASE-14471 。
还有一个 框架 传输问题尚未得到支持。我们建议使用 缓冲 传输。
至此如果还有错误,请移步到hue登录用户侧点 hue administers,或访问http://master:8889/hue/about/会自动检查配置: Configuration Step 2: Examples Step 3: Users Step 4: Go!
Checking current configuration
Configuration files located in /run/cloudera-scm-agent/process/332-hue-HUE_SERVER

Potential misconfiguration detected. Fix and restart Hue.

OOZIE_EMAIL_SERVER HBase Browser
Email notifications is disabled for Workflows and Jobs as SMTP server is localhost. Failed to authenticate to HBase Thrift Server, check authentication configurations.
在查相关文档解决:但看了最新版hue4.1之后,第一句话让你看到原来它是支持SQOOP1的,只是使用方式完全界面化了 ,且配置文件根本不在相关conf 下,而是每次启动会动态创建一个,omg --!:

发布者 色相队 在2017年8月24日,在 顺化4.1 , SQL , Sqoop
10评论
明年将在Hue 4.1和CDH 6中推出令人兴奋的新功能。其中之一是Hue的全新工具,使用Apache Sqoop 1将关系数据库中的数据导入HDFS文件或Hive表。它使我们能够通过交互式UI只需几次点击即可将大量数据带入集群。此Sqoop连接器已添加到Hue 的现有 导入数据向导 中。

过去,使用Sqoop命令行界面导入数据可能是一个麻烦且低效的过程。该任务期望用户对Sqoop有很好的了解。例如,他们需要将一系列必需的参数放在一起,这些参数具有特定的语法,这将导致容易出错。通常,正确地获取这些可能需要几个小时的工作。现在有了Hue的新功能,您可以在几分钟内完成Sqoop的工作。进口在YARN上运行,由Oozie安排。本教程提供了有关如何执行此操作的分步指南。



教程
你需要什么
首先,您需要在其中配置Apache Sqoop,Apache YARN,Apache Oozie和Hue的正在运行的集群。
接下来,您需要安装特定于数据库的JDBC jar。为此,请将它们放在HDFS上的某个目录中。
要获得MySQL自动完成功能,需要配置Lib RDBMS和笔记本:http://gethue.com/custom-sql-query-editors/
此外,您需要通过在 Hue ini中 的索引器部分下将enable_sqoop设置为true来打开该功能。
注意:
如果使用Cloudera Manager,请检查如何在hue.ini安全阀中添加属性并将上述参数值放在那里。

选择源表
现在让我们开始吧!
在本教程中,我们将从Teradata导入表到Apache Hive。单击左侧窗格中的汉堡菜单,然后选择屏幕左下角的选项以导航到Hue的索引器部分。从“类型”下拉列表中选择“外部数据库”。
选择数据库有两种模式:
预配置 - 允许您快速选择管理员已在Hue中配置的数据库。 自定义 - 允许您通过在真实自助服务模式下提供必要的凭据来访问所需的任何数据库。
注意:任一模式中的JDBC选项都允许我们使用JDBC特定凭据指向数据库。
我们现在将选择自定义模式,提供数据库凭据并启动测试连接。测试通过后,下拉列表将填充数据库名称列表。在数据库选择时,将填充表名列表。在桌面选择上,快速预览非常方便。您还可以选中“所有表”选项以一次性导入特定数据库的所有表。

选择目的地
完成源页面后,单击“下一步”以导航到目标页面。在这里,选择Destination类型,它可以是HDFS文件或Hive表。还要选择Sqoop运行导入作业所需的所有数据库特定jar。由于我们选择了Teradata,我们将选择所有teradata特有的罐子。
我们还可以为我们的导入作业添加额外的选项,如映射器编号,输出数据格式,分隔符,详细模式,拆分选项,压缩模式等。文档部分对此进行了详细说明。
我们甚至可以重命名列名称,并通过取消选中“保持”复选框来过滤掉不需要的列。
现在,让我们点击“提交”按钮。在这样做时,会生成一个Sqoop作业,可以在 Hue的作业浏览器中 进行跟踪。
完成工作后,我们可以通过对新导入的数据执行Hive / Impala查询,利用 Hue的编辑器 进行数据处理和查询分析。
文档
自己组装lib目录
我们需要执行Sqoop导入作业所需的所有库。该要求特定于正在使用的数据库。下面列出了一些流行数据库所需的罐子: Oracle: oracle-connector-java.jar MySQL: mysql-connector-java.jar Teradata: teradata-connector-java.jar,sqoop-connector-teradata-1.3c5.jar,tdgssconfig.jar,terajdbc4.jar PostgreSQL: postgresql-connector-java.jar
设置
属性提供了许多其他选项来进一步调整导入操作以适应您的特定工作负载。 Libs: Sqoop 1所需的数据库特定库 映射器 : 使用n个映射任务并行导入 拆分方: 用于拆分工作单位的表格列 详细模式: 在工作时打印更多信息 压缩模式: 启用压缩 格式: 数据可以以3种不同的格式导入:文本,avro,序列 字段: 设置字段分隔符(仅在格式为文本时启用) 行: 设置行尾字符(仅在格式为文本时启用) (可选)封闭方式: 设置包含字符的字段(仅在格式为文本时启用)

支持的数据库
Sqoop 1支持的任何数据库。

故障排除
在导入过程中,列的数据类型可以更改为HDFS / Hive兼容的数据类型。导入表时,主键用于为映射器创建拆分。如果没有主键,则需要明确选择拆分列; 如果不这样做会导致导入失败。在执行全表导入时,如果所有表都没有主键,则导入作业将失败。此外,如果由于某种原因作业失败,您可以从作业跟踪器中的日志中找出失败的原因。如需进一步帮助,请访问 https://sqoop.apache.org/docs/1.4.6/
大数据
2018-07-31 19:53:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
1. hive脚本的执行方式
hive脚本的执行方式大致有三种:
1. hive控制台 执行;
2. hive -e "SQL" 执行;
3. hive -f SQL文件 执行;
参考hive的用法 usage: hive -d,--define Variable subsitution to apply to hive commands. e.g. -d A=B or --define A=B --database Specify the database to use -e SQL from command line -f SQL from files -H,--help Print help information -h connecting to Hive Server on remote host --hiveconf Use value for given property --hivevar Variable subsitution to apply to hive commands. e.g. --hivevar A=B -i Initialization SQL file -p connecting to Hive Server on port number -S,--silent Silent mode in interactive shell -v,--verbose Verbose mode (echo executed SQL to the console)
1.1. hive控制台 执行
顾名思义,是进入hive控制台以后,执行sql脚本,例如: hive> set mapred.job.queue.name=pms; hive> select page_name, tpa_name from pms.pms_exps_prepro limit 2; Total MapReduce jobs = 1 Launching Job 1 out of 1 ... Job running in-process (local Hadoop) 2015-10-23 10:06:47,756 null map = 100%, reduce = 0% 2015-10-23 10:06:48,863 null map = 23%, reduce = 0% 2015-10-23 10:06:49,946 null map = 38%, reduce = 0% 2015-10-23 10:06:51,051 null map = 72%, reduce = 0% 2015-10-23 10:06:52,129 null map = 100%, reduce = 0% Ended Job = job_local1109193547_0001 Execution completed successfully Mapred Local Task Succeeded . Convert the Join into MapJoin OK APP首页 APP首页_价格比京东低 APP首页 APP首页_价格比京东低 Time taken: 14.279 seconds hive>
1.2. hive -e "SQL" 方式执行
利用 hive -e "SQL" 的方式进入hive控制台并直接执行sql脚本,例如: hive -e " set mapred.job.queue.name=pms; set mapred.job.name=[HQL]exps_prepro_query; select page_name, tpa_name from pms.pms_exps_prepro limit 2;"
1.3. hive -f SQL文件 方式执行
执行sql文件中的sql脚本,例如:
pms_exps_prepro.sql 文件内容如下: set mapred.job.queue.name=pms; set hive.exec.reducers.max=48; set mapred.reduce.tasks=48; set mapred.job.name=[HQL]pms_exps_prepro; drop table if exists pms.pms_exps_prepro; create table pms.pms_exps_prepro as select a.provinceid, a.cityid, a.ieversion, a.platform, '${date}' as ds from track_exps a;
上述文件中的sql脚本接收一个日期,接收参数写法类似 ${date} ,执行时如下执行: date=2015-10-22 hive -f pms_exps_prepro.sql --hivevar date=$date
2. hive转义字符的问题
下面以一个业务场景阐述关于hive转义字符的问题
track_exps记录曝光数据,现在小A希望获取2015-10-20有效的曝光数据
其中有效的曝光记录是指,
* relatedinfo 字段满足 数字.数字.数字.数字.数字 的格式,
例如 4.4.5.1080100.1 extfield1 字段满足 request-字符串,section-数字 的格式,
例如 request-b470805b620900ac492bb892ad7e955e,section-4
对于这个问题,小A写出了如下sql脚本: select * from track_exps where ds = '2015-10-20' and relatedinfo rlike '^4.\d+.\d+.\d+.\d+$' and extfield1 rlike '^request.+section-\d+$';
但是由于正则表达式是被包含在sql里面,所以里面的特殊字符需要转义
2.1. hive -e "SQL" 的方式执行
改动如下: hive -e " set mapred.job.queue.name=pms; explain select cityid from track_exps where ds = '2015-10-20' and relatedinfo rlike '\\^4\\.\\\d\\+\\.\\\d\\+\\.\\\d\\+\\.\\\d\\+\\$' and extfield1 rlike '\\^request\\.\\+section\\-\\\d\\+\\$';"
查看执行计划,可以确定正则表达式解析正确了: ... predicate: expr: ((relatedinfo rlike '^4.\d+.\d+.\d+.\d+$') and (extfield1 rlike '^request.+section-\d+$')) type: boolean ...
分析如下:
在 hive -e "SQL" 的执行方式中, "'正则表达式'" ,正则表达式先被一个单引号括起来,再被一个双引号括起来的,所以正则表达式里面, \\^ 的第一个 \ 用来解析第二个 \ ,第二个 \ 才真正起到了转义的作用
2.2. hive -f SQL文件 的方式执行
改动如下:
pms_exps_prepro.sql 文件内容如下: select * from track_exps where ds = '2015-10-20' and relatedinfo rlike '\^4\.\\d\+\.\\d\+\.\\d\+\.\\d\+\$' and extfield1 rlike '\^request\.\+section\-\\d\+\$';
分析如下:
不同于 hive -e "SQL" 的执行方式,因为是sql文件,所以正则表达式只被一个单引号括起来而已,一个 \ 就起到了转义的作用了
大数据
2018-07-31 17:46:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
导入模块: from pandas import DataFrame import pandas as pd import numpy as np
生成DataFrame数据 df = DataFrame(np.random.randn(4, 5), columns=['A', 'B', 'C', 'D', 'E'])
DataFrame数据预览: A B C D E 0 0.673092 0.230338 -0.171681 0.312303 -0.184813 1 -0.504482 -0.344286 -0.050845 -0.811277 -0.298181 2 0.542788 0.207708 0.651379 -0.656214 0.507595 3 -0.249410 0.131549 -2.198480 -0.437407 1.628228
计算各列数据总和并作为新列添加到末尾 df['Col_sum'] = df.apply(lambda x: x.sum(), axis=1)
计算各行数据总和并作为新行添加到末尾 df.loc['Row_sum'] = df.apply(lambda x: x.sum())
最终数据结果: A B C D E Col_sum 0 0.673092 0.230338 -0.171681 0.312303 -0.184813 0.859238 1 -0.504482 -0.344286 -0.050845 -0.811277 -0.298181 -2.009071 2 0.542788 0.207708 0.651379 -0.656214 0.507595 1.253256 3 -0.249410 0.131549 -2.198480 -0.437407 1.628228 -1.125520 Row_sum 0.461987 0.225310 -1.769627 -1.592595 1.652828 -1.022097
大数据
2018-07-31 16:43:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
1. 背景
最近在学习kylin,在安装的时候遇到一些坑,特意记录起来,也希望同样在学习kylin的人,少踩一些坑(要是连安装都过不去,还怎么学!!!)。
2. 环境
我选的kylin版本是1.5.4,因为买了一本叫《Apache kylin权威指南》,书中以1.5.x为蓝本,为了避免少踩坑,保持和书中版本一致。
关于kylin安装环境,参考 Hadoop Environment ,下面是我自己的环境,只是为了学习,所有的安装都是伪分布式的,也没有关注高可用 Ubuntu 14.04.5 LTS hadoop-2.7.1.tar.gz jdk-8u172-linux-x64.tar.gz hbase-1.2.5-bin.tar.gz apache-kylin-1.5.4-HBase1.x-bin.tar.gz apache-hive-1.2.1-bin.tar.gz
特别注意点 : kylin的版本要和hbase的版本对应,具体参考官网说明( Hadoop Environment ),其实kylin打包的名字也能看出来 注意hadoop和hbase的版本( hbase hadoop version ) jdk和hbase的版本( hbase jdk version ) hive和jdk版本( hive jdk version ) 最好在linux环境下安装,在mac下,启动kylin的时候,脚本会报错,当然可以改脚本 ( mac无法启动kylin )。此外,在Ubuntu下安装也不省心,启动kylin也会报错,改脚本吧。那么,最好使用centos,我尝试了,不会报错。
3. 安装 下载安装包,这个链接可以下载到apache所有的安装包,但速度不快,有些找不到的安装包,可以在这里下载( Apache Software Foundation Distribution Directory ),解压 设置环境变量 export JAVA_HOME=/root/jdk1.8.0_172 export HADOOP_HOME=/root/hadoop-2.7.1 export HIVE_HOME=/root/hive-1.2.1 export HBASE_HOME=/root/hbase-1.2.5 export KYLIN_HOME=/root/kylin-1.5.4 export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin:$HBASE_HOME/bin:$KYLIN_HOME/bin 安装hadoop,主要编辑的文件有:core-site.xml、hadoop-env.sh、hdfs-site.xml、mapred-site.xml、yarn-site.xml(都在$HADOOP_HOME/etc/hadoop目录内)( hadoop Pseudo-Distributed Operation ) ssh to localhost without a passphrase, execute the following commands $ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa $ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys $ chmod 0600 ~/.ssh/authorized_keys hadoop-env.sh,加一行JAVA_HOME export JAVA_HOME=/root/jdk1.8.0_172 core.xml fs.defaultFS hdfs://localhost:9000 hadoop.tmp.dir /root/tmp hdfs-site.xml dfs.replication 1 mapred-site.xml:jobhistory记得也要配置,我之前没有配置,导致使用kylin的sampldata构建cube失败 mapreduce.framework.name yarn mapreduce.jobhistory.address localhost:10020 mapreduce.jobhistory.webapp.address localhost:19888 yarn-site.xml yarn.nodemanager.aux-services mapreduce_shuffle
然后,format namenode bin/hdfs namenode -format ,启动hdfs sbin/start-dfs.sh ,启动yarn sbin/start-yarn.sh , http://ip:50070/可以查namenode的情况,http://ip:8088/可以查看resourcemanager情况 安装hive:主要编辑的文件为hive-site.xml,此文件通过copy hive-default.xml.template而来( hive Installation and Configuration ) 在hdfs上面创建目录 $ $HADOOP_HOME/bin/hadoop fs -mkdir /tmp $ $HADOOP_HOME/bin/hadoop fs -mkdir /user/hive/warehouse $ $HADOOP_HOME/bin/hadoop fs -chmod g+w /tmp $ $HADOOP_HOME/bin/hadoop fs -chmod g+w /user/hive/warehouse 修改hive-site.xml,主要改的地方为 javax.jdo.option.ConnectionURL jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true&characterEncoding=UTF-8&useSSL=false javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver javax.jdo.option.ConnectionUserName hive javax.jdo.option.ConnectionPassword hive
上面就是使用mysql存储hive的元数据,如果出现SSL问题,就把useSSL置为false, 特别注意,不要使用默认的derby,否则在运行kylin的sampledata时,无法在hive中创建表 ,此外,还需要把hive-site.xml中的${system:java.io.tmpdir}和${system:user.name}分别替换成/tmp和${user.name},当然还需要把mysql-connector-java.x.jar二方包加入hive的lib目录中, 特别注意,使用5.x版本,不要使用6.x版本 。最后,运行 bin/hive 安装hbase:主要修改的文件为hbase-env.sh、hbase-site.xml( quickstart ) 修改hbase-env.sh,添加 export JAVA_HOME=/root/jdk1.8.0_172 修改hbase-site.xml hbase.rootdir hdfs://localhost:9000/hbase hbase.cluster.distributed true hbase.zookeeper.property.dataDir /root/tmp/hbase/zookeeper
**特别注意,对于伪分布式安装,hbase.cluster.distributed要设置为true。此外,这里使用hbase内置的zookeeper。**最后,执行bin/satrt-hbase.sh,启动hbase 安装kylin 修改check-env.sh:可以先执行 bin/check-env.sh ,一般来说配置了上面所述的环境变量,是可以通过check,但是这个脚本在mac和ubuntu下执行还是有问题,mac下的问题我没有解决,Ubuntu下面问题解了。原因是 get-properties.sh 内容在Ubuntu下执行有问题。不过在centos下没有这个问题( 安装指南 ) ## 原始文件 if [ $# != 1 ] then echo 'invalid input' exit -1 fi IFS=$'\n' result= for i in `cat ${KYLIN_HOME}/conf/kylin.properties | grep -w "^$1" | grep -v '^#' | awk -F= '{ n = index($0,"="); print substr($0,n+1)}' | cut -c 1-` do : result=$i done echo $result ## 修改后的文件 if [ $# != 1 ] then echo 'invalid input' exit -1 fi #IFS=$'\n' result=`cat ${KYLIN_HOME}/conf/kylin.properties | grep -w "^$1" | grep -v '^#' | awk -F= '{ n = index($0,"="); print substr($0,n+1)}' | cut -c 1-` #for i in `cat ${KYLIN_HOME}/conf/kylin.properties | grep -w "^$1" | grep -v '^#' | awk -F= '{ n = index($0,"="); print substr($0,n+1)}' | cut -c 1-` #do # : # result=$i #done echo $result 我目前使用的是apache-kylin-1.5.4-HBase1.x-bin.tar.gz版本,此版本在conf目录下,把压缩相关的配置注释了,包括kylin_hive_conf.xml、kylin_job_conf_inmem.xml、 kylin_job_conf.xml、kylin.properties,我之前用1.5.3并没有注释掉,导致在运行构建cube是出现snappy不存在问题。 # Compression codec for htable, valid value [none, snappy, lzo, gzip, lz4] # 1.5.3默认未snappy,但是我使用的hadoop的并没有snappy压缩功能,所以要么把压缩相关的配置注释掉,或者重新打包hadoop kylin.hbase.default.compression.codec=none
之后,运行 bin/kylin.sh satrt ,启动成功后,访问 http://ip:7070/kylin,用户名是ADMIN,密码是KYLIN。然后可以运行 bin/sample.sh ,体验下kylin,运行完 sample.sh 后重启kylin,就可以build cube了。
3. 其他 mac无法启动kylin 最好使用linux 运行sample.sh后hive无法找到表 不要使用derby,使用mysql build cube出现 Unexpected exception: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: native snappy library not available: this version of libhadoop was built without snappy support. 重新编译hadoop,让其支持snappy: https://blog.csdn.net/wzy0623/article/details/51263041 关闭kylin的压缩配置: http://kylin.apache.org/docs15/install/advance_settings.html kylin 安装配置实验
大数据
2018-07-31 16:33:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
跟Hadoop的无缝集成使得使用MapReduce对HBase的数据进行分布式计算非常方便,本文将以前面的blog示例,介绍HBase下MapReduce开发要点。很好理解本文前提是你对Hadoop MapReduce有一定的了解。
HBase MapReduce核心类介绍
首先一起来回顾下MapReduce的基本编程模型,
可以看到最基本的是通过Mapper和Reducer来处理KV对,Mapper的输出经Shuffle及Sort后变为Reducer的输入。除了Mapper和Reducer外,另外两个重要的概念是InputFormat和OutputFormat,定义了Map-Reduce的输入和输出相关的东西。HBase通过对这些类的扩展(继承)来方便MapReduce任务来读写HTable中的数据。
实例分析
我们还是以最初的blog例子来进行示例分析,业务需求是这样:找到具有相同兴趣的人,我们简单定义为如果author之间article的tag相同,则认为两者有相同兴趣,将分析结果保存到HBase。除了上面介绍的blog表外,我们新增一张表tag_friend,RowKey为tag,Value为authors,大概就下面这样。
我们省略了一些跟分析无关的Column数据,上面的数据按前面描述的业务需求经过MapReduce分析,应该得到下面的结果
实际的运算过程分析如下
代码实现
有了上面的分析,代码实现就比较简单了。只需以下几步
定义Mapper类继承TableMapper,map的输入输出KV跟上面的分析一致。public static class Mapper extends TableMapper {
public Mapper() {}
@Override
public void map(ImmutableBytesWritable row, Result values,Context context) throws IOException {
ImmutableBytesWritable value = null;
String[] tags = null;
for (KeyValue kv : values.list()) {
if ("author".equals(Bytes.toString(kv.getFamily()))
&& "nickname".equals(Bytes.toString(kv.getQualifier()))) {
value = new ImmutableBytesWritable(kv.getValue());
}
if ("article".equals(Bytes.toString(kv.getFamily()))
&& "tags".equals(Bytes.toString(kv.getQualifier()))) {
tags = Bytes.toString(kv.getValue()).split(",");
}
}
for (int i = 0; i < tags.length; i++) {
ImmutableBytesWritable key = new ImmutableBytesWritable(
Bytes.toBytes(tags[i].toLowerCase()));
try {
context.write(key,value);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
}
}
复制代码
定义Reducer类继承TableReducer,reduce的输入输出KV跟上面分析的一致。public static class Reducer extends TableReducer {
@Override
public void reduce(ImmutableBytesWritable key,Iterable values,
Context context) throws IOException, InterruptedException {
String friends="";
for (ImmutableBytesWritable val : values) {
friends += (friends.length()>0?",":"")+Bytes.toString(val.get());
}
Put put = new Put(key.get());
put.add(Bytes.toBytes("person"), Bytes.toBytes("nicknames"),
Bytes.toBytes(friends));
context.write(key, put);
}
}
复制代码
在提交作业时设置inputFormat为TableInputFormat,设置outputFormat为TableOutputFormat,可以借助TableMapReduceUtil类来简化编码。public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf = HBaseConfiguration.create(conf);
Job job = new Job(conf, "HBase_FindFriend");
job.setJarByClass(FindFriend.class);
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("author"),Bytes.toBytes("nickname"));
scan.addColumn(Bytes.toBytes("article"),Bytes.toBytes("tags"));
TableMapReduceUtil.initTableMapperJob("blog", scan,FindFriend.Mapper.class,
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
TableMapReduceUtil.initTableReducerJob("tag_friend",FindFriend.Reducer.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
复制代码
对大数据感兴趣的朋友可以加我的群 615997810 一起交流学习,还有免费资料可以领取 1,_推荐系统理论与实战项目 Part2
2,推荐系统理论与实战 项目Part1
3.实时交易监控系统项目(下)
4,实时交易监控系统项目(上)
5,用户行为分析系统项目
6,分布式全文搜索引擎ElasticSearch Part2
7,大数据批处理之HIVE详解
8,ES公开课 part1
9,spark_streaming_
10,数据仓库搭建详解
11,大数据任务调度
12,流数据集成神器Kafka
13,Spark 公开课
14,海量日志收集利器:Flume
15,Impala简介
16,Hive简介
17,MapReduce简介
18海量数据高速存取数据库 HBase
19,浅谈Hadoop管理器yarn原理

小结
本文通过实例分析演示了使用MapReduce分析HBase的数据,需要注意的这只是一种常规的方式(分析表中的数据存到另外的表中),实际上不局限于此,不过其他方式跟此类似。如果你进行到这里,你肯定想要马上运行它看看结果,希望大家多多关注哦
大数据
2018-07-31 15:54:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
上一篇文章,博主介绍了zookeeper集群的搭建全过程;今天博主将为大家分享的是zookeeper的命令行客户端使用、zookeeper的数据结构和监听功能。
zookeeper启动与关闭服务的命令: ./bin/zkServer.sh start 开启zookeeper ./bin/zkServer.sh stop 停止zookeeper
一、zookeeper数据结构
zookeeper特性:
1)、Zookeeper:一个leader,多个follower组成的集群
2)、全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的
3)、分布式读写,更新请求转发,由leader实施
4)、更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行
5)、数据更新原子性,一次数据更新要么成功,要么失败
6)、实时性,在一定时间范围内,client能读到最新数据
zookeeper数据结构:
1)、层次化的目录结构,命名符合常规文件系统规范(见下图)
2)、每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识
3)、节点Znode可以包含数据和子节点(EPHEMERAL短暂类型的节点不能有子节点)
4)、客户端应用可以在节点上设置监视器
数据结构的图:
节点类型
1)、Znode有两种类型:
短暂(ephemeral)(断开连接自己删除)
持久(persistent)(断开连接不删除)
2)、Znode有四种形式的目录节点(默认是persistent )
PERSISTENT -----永久节点
PERSISTENT_SEQUENTIAL(持久序列/test0000000019 ) ------永久序号节点
EPHEMERAL -----临时节点(短暂节点)
EPHEMERAL_SEQUENTIAL ------短暂序号节点
3)、创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护
4)、在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序
二、zookeeper命令行客户端初识
zookeeper的客户端和其它可执行命令都在其bin目录中,分别是zkCli.sh和zkCli.cmd,前者是Linux中使用的客户端启动脚本,后者是windows系统中的客户端启动脚本。
1)、运行 zkCli.sh –server 进入命令行工具:./zkCli.sh -server 192.168.29.135 cd /opt/apps/zookeeper-3.4.13/bin ./zkCli.sh -server 192.168.29.135 如果不指定-server参数,连接的是本机ip
2)、命令行客户端连接上zookeeper后,输入help可查看zookeeper命令帮助
3)、客户端命令例子 1、使用 ls 命令来查看当前 ZooKeeper 中所包含的内容: [zk: 192.168.29.135(CONNECTED) 1] ls / 2、创建一个新的 znode ,使用 create /zk myData 。这个命令创建了一个新的 znode 节点“ zk ”以及与它关联的字符串,默认为创建永久节点: [zk: 192.168.29.135(CONNECTED) 2] create /zk "myData“ 3、我们运行 get 命令来确认 znode 是否包含我们所创建的字符串: [zk: 192.168.29.135(CONNECTED) 3] get /zk //数据描述信息 "" cZxid=0x30000004创建事务id ctime=Tue apr 05 02:35:02 CST 2016 mZxid=0x30000004修改事务id mtime=Tue apr 05 02:35:02 CST 2016 cversion=1创建版本号, dataversion=1 数据版本号 aclversion=0 权限版本号 ephemeralOwner dataLength=5数据长度 numChildren=1子节点数 #监听这个节点的变化,当另外一个客户端改变/zk时,它会打出下面的 #WATCHER:: #WatchedEvent state:SyncConnected type:NodeDataChanged path:/zk [zk: 192.168.29.135(CONNECTED) 4] get /zk watch 4、下面我们通过 set 命令来对 zk 所关联的字符串进行设置: [zk: 192.168.29.135(CONNECTED) 5] set /zk "zsl“ 5、下面我们将刚才创建的 znode 删除,只能删除叶子节点: [zk: 192.168.29.135(CONNECTED) 6] delete /zk 6、删除节点,可删除非叶子节点及其下的子节点的所有数据:rmr [zk: 192.168.29.135(CONNECTED) 7] rmr /zk 7、创建短暂节点(客户端退出后自动删除) [zk: 192.168.29.135(CONNECTED) 8] create -e /aap2 “bbb” 8、带序号的短暂节点,zookeeper在创建带序号的节点时,如果从来没创建过,序号冲0开始,反之从1开始 [zk: 192.168.29.135(CONNECTED) 9] create -s -e /aap2 “bbb” 9、设置值 [zk: 192.168.29.135(CONNECTED) 10] set /app1 bbbb 10、拿数据并且注册监听本节点数据的变化(监听只生效一次) [zk: 192.168.29.135(CONNECTED) 11] get /app1 watch 11、获取子节点变化的监听 [zk: 192.168.29.135(CONNECTED) 12] ls /app1 watch 12、quit退出zookeeper客户端 [zk: 192.168.29.135(CONNECTED) 12] quit
三、zookeeper命令行客户端命令详解
1)、connect命令,连接zk服务端,(与close命令配合使用可以连接或者断开zk服务端),可用于更换连接的zookeeper服务器:
2)、close命令,用于关闭与服务端的链接
3)、get命令,用于获取节点的信息,注意节点的路径必须是以/开头的绝对路径。如get /
数据详细信息说明: cZxid:节点创建时的zxid ctime:节点创建时间 mZxid:节点最近一次更新时的zxid mtime:节点最近一次更新的时间 cversion:子节点数据更新次数 dataVersion:本节点数据更新次数 aclVersion:节点ACL(授权信息)的更新次数 ephemeralOwner:如果该节点为临时节点,ephemeralOwner值表示与该节点绑定的session id. 如果该节点不是临时节点,ephemeralOwner值为0 dataLength:节点数据长度,本例中为根节点/的长度 numChildren:子节点个数
4)、stat命令,用于查看节点的状态信息,如stat /;该命令的结果参数说明同get命令
5)、set命令,用于设置节点的数据,如:set /userid2 32222
6)、ls命令是用于获取路径下的节点信息,注意路径为绝对路径,如:ls2 /userid2
7)、ls2命令是ls命令的增强版,比ls命令多输出本节点信息,如:ls2 /userid2
8)、listquota命令用于显示配额,如listquota /userid3
9)、setquota命令用于设置节点个数以及数据长度的配额,如: setquota –n 4 /userid3 设置/userid3子节点个数最大为4 setquota –b 100 /userid3 设置/userid3节点长度最大为100
10)、delquota命令用于删除配额,-n为子节点个数,-b为节点数据长度,如:delquota –n 2
11)、history用于列出最近的命令历史,可以和redo配合使用。如history
12)、redo命令用于再次执行某个命令,使用方式为redo cmdid 如 redo 20,常与history配合使用
13)、create命令用于创建节点,其中-s为顺序充点,-e临时节点
14)、delete命令用于删除节点,如delete /userid3
15)、addauth命令用于节点认证,使用方式:如addauth digest username:password
16)、setAcl命令用于设置节点Acl
Acl由三部分构成:1为scheme,2为user,3为permission,一般情况下表示为scheme:id:permissions
17)、getAcl命令获取节点的Acl,如getAcl /node1
18)、sync命令用于强制同步,由于请求在半数以上的zk server上生效就表示此请求生效,那么就会有一些zk server上的数据是旧的。sync命令就是强制同步所有的更新操作。
19)、printWatchers命令用于设置和显示监视状态,值为on或则off
20)、quit命令退出客户端
大数据
2018-07-30 22:54:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
一开始写爬虫的时候并不知道这个东西,后来写的爬虫逐渐复杂了,json出现的次数也多了起来,这让我很好奇,于是我百度了一下,对于json百度是这样解释的
看完是不是有点懵?我当时也是完全不理解,对于这段话,其实不用深究,只要抓住关键词“数据交换格式”,也就是说json本质上就是一种格式(也可以理解为数据表示形式)。简单粗暴的举个例子, json = '{"a": "Hello", "b": "World"}' #这种形式就叫json,是不是和python里的字典的形式非常像?json的对象也就是以json形式表示的数据的本质就是一个字符串。
那么json有什么用呢,从百度给的解释来看主要是用于数据的交换,比如,要把python中的数据传给js,那么就可以把数据先转为json格式,再传给js,这样js就可以很方便的对数据进行处理了,json的作用就是这样。
现在版本比较高的python都内置了json库,调用的时候只要import json就可以了。
json使用起来也很简单,基本只要用到四个函数load(),loads(),dump(),dumps()至于他们的用法,就去问度娘吧,网上有很多解释,这里简单提一下,dump就是将数据转为json格式,而load()就是将json格式数据转为自己的格式,比如将json转为python中的字典,然后就可以用对字典的操作处理数据了。一般在从js里面提取信息时会用到json。
像我一样的小白总是一上来就被生硬的解释吓到了,但其实了解的多了,抓住其本质,理解起来还是不难的。
大数据
2018-07-30 18:52:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
本篇文章是接上一篇《超详细hadoop虚拟机安装教程(附图文步骤)》,上一篇有人问怎么没写hadoop安装。在文章开头就已经说明了,hadoop安装会在后面写到,因为整个系列的文章涉及到每一步的截图,导致文章整体很长。会分别先对虚拟机的安装、Linux系统安装进行介绍,然后才会写到hadoop安装,关于hadoop版本我使用的是大快搜索三节点发行版DKhadoop。(三节点的DKHadoop发行版可以自己去大快网站页面下载,目前是开放所有权限的,也就是免费版本和付费版本的权限一样,不知道以后会不会限制权限,至少目前是没有的) Hadoop安装的分享最迟下周可以整理完,整理好便会分享给大家的。本篇重点分享的是hadoop运行操作系统的安装说明,还是以图文的形式,步骤比较多,看完需要耐心的!
申明:平台运行于Linux环境下,为便于操作,本例中统一使用64位CentOS系统版本号6.5进行操作介绍,如无特殊说明本文档所述所有操作均为此版本Linux为准。 参考网址: http://www.xitongzhijia.net/linux/201603/69281.html
安装准备 将系统安装光盘放入光驱中,加电启动服务器并进入BIOS设置界面(因服务器品牌不同对应主板系统不同、快捷键也不同,请按实际情况为主),进入BIOS设置界面的操作根据服务器型号不同操作也不一样,具体操作请咨询服务器硬件提供商,通常的操作是在启动界面按F2或F10键,BIOS设置界面与下图类:
1、将服务器光盘启动调整至第一位置,如下图所示。
2、按F10保存退出,服务器会进入系统安装界面。
安装步骤 1、服务器启动之安装选择界面,如下图所示,选择第一项:
2、出现是否对CD媒体进行测试的提问,这里选择“Skip”跳过测试。
3、出现下面的界面,点击next:
4、选择语言为:中文(简体)见图4-1;键盘选择:美国英语式,见图4-2;
图4-1
图4-2 5、选择第一项,基本存储设备
6、选择忽略所有数据
7、设定主机名称,以node为例(可按需要自行设定)见图7-1,在设置名称的同时把网络也设置上,见图7-2 图7-1 图7-2
8、设置自动连接、IPv4设置、设置成手动连接
9、时区选择:亚洲上海,"系统时钟使用UTC" 前面打勾,使用UTC时间
10、设定root账户密码,根据实际需要设定,这是以后管理系统所需要的凭证: 图10-1 如果出现以下提示(见图10-2) 选择“无论如何都要使用” 图10-2 11、选择第五项(创建自定义布局)、查看并修改分区布置,点击下一步:
12、删除默认分区
13、创建新分区
14、按图所选 15、创建第二个分区 16、按图所选
17、设置完之后如图
18、选择将修改写入磁盘:
19、进入如图所示步骤,选择下一步
20、如下图所示选择“Desktop”和“以后自定义”点击下一步
其他选项默认即可,下一步即可开始安装软件包,安装完成后点击“重新引导”完成安装并重启服务器。
21、第一次启动如下图所示
22、勾选“是,我同意协议”,之后一直点击前进即可完成安装。
大数据
2018-07-30 15:23:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
1、new Thread的弊端
执行一个异步任务你还只是如下new Thread吗?
new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } }).start();
那你就out太多了,new Thread的弊端如下:
a. 每次new Thread新建对象性能差。
b. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。
c. 缺乏更多功能,如定时执行、定期执行、线程中断。
相比new Thread,Java提供的四种线程池的好处在于:
a. 重用存在的线程,减少对象创建、消亡的开销,性能佳。
b. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
c. 提供定时执行、定期执行、单线程、并发数控制等功能。

2、 Java 线程池
Java通过Executors提供四种线程池,分别为:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
(1). newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。示例代码如下:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int index = i; try { Thread.sleep(index * 1000); } catch (InterruptedException e) { e.printStackTrace(); } cachedThreadPool.execute(new Runnable() { @Override public void run() { System.out.println(index); } }); }
线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。

(2). newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。示例代码如下:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { final int index = i; fixedThreadPool.execute(new Runnable() { @Override public void run() { try { System.out.println(index); Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); }
因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。
定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()。可参考PreloadDataCache。

(3) newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.schedule(new Runnable() { @Override public void run() { System.out.println("delay 3 seconds"); } }, 3, TimeUnit.SECONDS);
表示延迟3秒执行。

定期执行示例代码如下:
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("delay 1 seconds, and excute every 3 seconds");
}
}, 1, 3, TimeUnit.SECONDS);
表示延迟1秒后每3秒执行一次。
ScheduledExecutorService比Timer更安全,功能更强大,后面会有一篇单独进行对比。

(4)、newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { final int index = i; singleThreadExecutor.execute(new Runnable() { @Override public void run() { try { System.out.println(index); Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); }
结果依次输出,相当于顺序执行各个任务。
现行大多数GUI程序都是单线程的。Android中单线程可用于数据库操作,文件操作,应用批量安装,应用批量删除等不适合并发但可能IO阻塞性及影响UI线程响应的操作。
大数据
2018-07-30 10:35:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
本篇文章,博主将分享zookeeper集群环境搭建的整个过程。在分享之前,博主还得说明些注意事项。首先,zookeeper是java编写的应用程序,所以在安装之前需要先安装java环境。
本次可能用到的操作: sucureCRT -> view -> chat windows可以批量发送命令 send chat to all session rz sucureCRT命令行输入rz命令执行文件上传选择

集群安装步骤(本次以三台zookeeper服务器集群为例):
(1)首先,将三台服务器上的java环境安装好(由于之前文章已经介绍过,故此处不再重复)
(2)登录zookeeper官网下载安装包;地址:http://zookeeper.apache.org/
(3)上传zookeeper到其中一台服务器的/opt/apps目录中(拖拽zookeeper压缩包到SecureCRT中,选中send Zmodem)

报错:-bash: rz: command not found
(4)由于上一步提示rz上传命令未安装,先安装rz软件 yum list|grep rz yum install lrzsz
(5)重复步骤3进行文件上传成功
(6)解压zookeeper软件安装压缩包:tar -zxvf zookeeper-3.4.13.tar.gz -C /opt/apps(解压)
(7)修改zookeeper配置文件 cd /opt/apps/zookeeper-3.4.13/conf/ cp zoo_sample.cfg zoo.cfg vi zoo.cfg
(8)创建zookeeper数据文件并且设置权限 cd /opt/apps/zookeeper-3.4.13/ mkdir -m 755 data mkdir -m 755 log
(9) 在/opt/apps/zookeeper-3.4.13/data文件夹下 新建myid文件,myid的文件内容为server.3后面的3(此处需参照当前服务器id进行配置) cd data vi myid 或者 echo "3" > myid
(10)将集群下发到其他两台机器上 scp -r /opt/apps/zookeeper-3.4.13 root@192.168.29.135:/opt/apps/ scp -r /opt/apps/zookeeper-3.4.13 root@192.168.29.136:/opt/apps/
(11)修改其他机器的配置文件 192.168.29.135上:修改myid为:1 192.168.29.135上:修改myid为:2
(12)关闭防火墙(此处为节省时间也不配防火墙,且生产中一般为内网使用,故都不需要配置防火墙) service iptables stop chkconfig iptables off
(13)启动zookeeper;执行命令:/opt/apps/zookeeper-3.4.13/bin/zkServer.sh start
(14)查看zookeeper集群状态,(QuorumPeerMain为zookeeper的进程名称) 1、 jps(查看进程) 2、 /opt/apps/zookeeper-3.4.13/bin/zkServer.sh status(查看集群状态,主从信息)

(15)集群搭建完成

最后,大家如果觉得博主文章写的不错,或者对您有帮助,请点赞博主。如果您对其它服务器技术或者博主本人感兴趣,请关注博主,随时欢迎同博主交流技术。

大数据
2018-07-29 21:55:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
一、zookeeper角色概念
zookeeper集群中有三种角色(zookeeper服务器节点),它们分别是:群首(leader),追随者(follower),观察者(observer)。由于观察者这种节点我们工作中基本不会接触到,故此只重点讲述前两种角色。
群首(leader):Leader作为整个ZooKeeper集群的主节点,负责响应所有对ZooKeeper状态变更的请求。它会将每个状态更新请求进行排序和编号,以便保证整个集群内部消息处理的FIFO。
这里补充一下ZooKeeper的请求类型。对于exists,getData,getChildren等只读请求,收到该请求的zk服务器将会在本地处理,因为由其内部的ZAB理论实现,每个服务器看到的名字空间内容都是一致的,无所谓在哪台机器上读取数据,因此如果ZooKeeper集群的负载是读多写少,并且读请求分布得均衡的话,效率是很高的。对于create,setData,delete等有写操作的请求,则需要统一转发给leader处理,leader需要决定编号、执行操作,这个过程称为一个事务(transaction)。
追随者(follower):Follower主要是响应本服务器上的读请求外,另外follower还要处理leader的提议,并在leader提交该提议时在本地也进行提交。另外需要注意的是,leader和follower构成ZooKeeper集群的法定人数,也就是说,只有他们才参与新leader的选举、响应leader的提议。
二、选举原理
(1) 第一种情况:集群是全新的集群。
以一个简单的例子来说明整个选举的过程.
假设有五台服务器组成的zookeeper集群,它们的id从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是 一样的.假设这些服务器依序启动,来看看会发生什么:

1) 服务器1启动,此时只有它一台服务器启动了,它发出去的报没有任何响应,所以它的选举状态一直是LOOKING状态
2) 服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是3),所以服务器1,2还是继续保持LOOKING状态.
3) 服务器3启动,根据前面的理论分析,服务器3成为服务器1,2,3中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的leader.
4) 服务器4启动,根据前面的分析,理论上服务器4应该是服务器1,2,3,4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能成为follower了.
5) 服务器5启动,同4一样,成为follower.
(2)第二种情况:非全新集群的选举机制(数据恢复)
那么,初始化的时候,是按照上述的说明进行选举的,但是当zookeeper运行了一段时间之后,有机器down掉,重新选举时,选举过程就相对复杂了。
需要加入数据id、leader id和逻辑时钟。
数据id:数据新的id就大,数据每次更新都会更新id。
Leader id:就是我们配置的myid中的值,每个机器一个。
逻辑时钟:这个值从0开始递增,每次选举对应一个值,也就是说: 如果在同一次选举中,那么这个值应该是一致的 ; 逻辑时钟值越大,说明这一次选举leader的进程更新.
选举的规则就变成:
1、逻辑时钟小的选举结果被忽略,重新投票
2、统一逻辑时钟后,数据id大的胜出
3、数据id相同的情况下,leader id大的胜出
根据这个规则选出leader。

三、Zookeeper集群-应用程序的工作原理

以上是博主今天对zookeeper角色工作原理介绍的全部内容,由于本教程的重点是大数据部分,故此处zookeeper的介绍不会详细到专家级。如果大家对zookeeper底层原理或其它知识感兴趣,请点赞并关注博主,博主将在后期推出zookeeper的详细教程。最后的最后,如果大家觉得文章不错,请为博主点赞。
大数据
2018-07-29 00:30:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
容器构建时需要下载多种软件,往往这是非常耗时间的。hub.docker.com本来就慢,尤其是遇到存放在gcr.io/aws等上面的模块就挂了,pip安装python模块是也较慢,conda的下载更是如蜗牛。
加快容器构建时的下载速度,有多种方法:
1、放在“外面的服务器”构建,然后传送到aliyun等镜像,下载速度就会快很多很多。 步骤可以参考: 在阿里云创建Kubernetetes-1.11.0镜像服务(高速) 系统盘不够的话,参考: 如何给容器服务的Docker增加数据盘
2、添加proxy和pip、conda的镜像。如下是给jupyterhub环境下使用构建的一个singleuser镜像。 # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. FROM jupyter/all-spark-notebook:5811dcb711ba LABEL maintainer="Databook Project,https://github.com/databooks" USER root # ==================================================================== # Add proxy, using --build-arg "HTTP_PROXY=http://192.168.199.99:9999" ENV HTTP_PROXY ${HTTP_PROXY} ENV HTTPS_PROXY ${HTTP_PROXY} ENV http_proxy ${HTTP_PROXY} ENV https_proxy ${HTTP_PROXY} #Add conda install mirror: RUN echo $http_proxy && \ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/ && \ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/ && \ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge/ && \ conda config --set show_channel_urls yes #Add pip install mirror: RUN echo "[global] \ index-url = http://pypi.tuna.tsinghua/simple \ trusted-host = \ pypi.tuna.tsinghua \ timeout = 120 \ " > /etc/pip.conf # ==================================================================== # ==================================================================== USER $NB_UID RUN pip install --upgrade pip RUN pip install bs4 && \ pip install lxml && \ pip install ipyleaflet && \ pip install py4j && \ pip install pyspark && \ pip install mlflow && \ pip install airflow && \ pip install tushare RUN conda update -n base conda RUN conda install -y -c conda-forge nodejs=8.10.0 && \ conda install -y -c conda-forge tensorflow=1.8.0 && \ jupyter labextension install jupyter-leaflet # ==================================================================== ENV HTTP_PROXY "" ENV HTTPS_PROXY "" ENV http_proxy "" ENV https_proxy "" # ====================================================================
注意: 注意要加到docker里面,在宿主机设置没有用的,因为docker build是在独立的容器中运行的。 build时使用docker build --build-arg "HTTP_PROXY=http://192.168.199.99:9999"将proxy参数传进去,结束时将http_proxy等参数清除。这样可以避免将proxy信息写入镜像。 注意改成自己的服务地址。 这里使用了清华的pip和conda镜像,也可以换成其它的。参考下面的信息: Ubuntu 配置 pip.conf 添加国内源
更多的参考: Databook-数据之书
大数据
2018-07-28 23:24:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
一、概念
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:主从协调、服务器节点动态上下线、配置维护、域名服务、分布式同步(分布式共享锁)、组服务等。ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。
Zookeeper 本身就是一个分布式集群程序, 集群的角色:Leader 和follower(Observer)只要集群中有半数以上节点存活,集群就能提供服务。由于zookeeper的以上特性,集群部署时,需安装为基数太。

二、 原理
ZooKeeper是以Fast Paxos算法为基础的,Paxos 算法存在活锁的问题,即当有多个proposer交错提交时,有可能互相排斥导致没有一个proposer能提交成功,而Fast Paxos作了一些优化,通过选举产生一个leader (领导者),只有leader才能提交proposer,具体算法可见Fast Paxos。因此,要想弄懂ZooKeeper首先得对Fast Paxos有所了解。
ZooKeeper的基本运转流程:
1、选举Leader。
2、同步数据。
3、选举Leader过程中算法有很多,但要达到的选举标准是一致的。
4、Leader要具有最高的执行ID,类似root权限。
5、集群中大多数的机器得到响应并接受选出的Leader。
三、特点
在Zookeeper中,znode是一个跟Unix文件系统路径相似的节点,可以往这个节点存储或获取数据。如果在创建znode时Flag设置为EPHEMERAL(短暂的),那么当创建这个znode的节点和Zookeeper失去连接后,这个znode将不再存在在Zookeeper里,Zookeeper使用Watcher察觉事件信息。当客户端接收到事件信息,比如连接超时、节点数据改变、子节点改变,可以调用相应的行为来处理数据。Zookeeper的Wiki页面展示了如何使用Zookeeper来处理事件通知,队列,优先队列,锁,共享锁,可撤销的共享锁,两阶段提交。

四、Zookeeper文件系统
每个子目录项如 NameService 都被称作为znode,和文件系统一样,我们能够自由的增加、删除znode,在一个znode下增加、删除子znode,唯一的不同在于znode是可以存储数据的。
有四种类型的znode:
1、PERSISTENT-持久化目录节点
客户端与zookeeper断开连接后,该节点依旧存在
2、PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点
客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号
3、EPHEMERAL-临时目录节点
客户端与zookeeper断开连接后,该节点被删除
4、EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点
客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号

五、Zookeeper通知机制
客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、被删除、子目录节点增加删除)时,zookeeper会通知客户端。

六、zookeeper使用场景
(1)采集任务接管
(2)服务主从选举
(3)分布式共享锁
(4)配置中心

最后,zookeeper的使用场景当然不局限于博主举例出来的这几种,还有如dubbo将其使用为服务器注册中心、负载均衡等等;如果您对这块知识非常感兴趣,请关注博主并欢迎同博主交流。今天的教程就到这里,如果大家觉得博主的分享不错,请点赞支持博主。

参考文献:https://yq.aliyun.com/articles/588640
大数据
2018-07-28 22:55:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
前两天看到有人留言问在什么情况下需要部署hadoop,我给的回答也很简单,就是在需要处理海量数据的时候才需要考虑部署hadoop。关于这个问题在很早之前的一篇分享文档也有说到这个问题,数据量少的完全发挥不了hadoop的优势,所以也没必要部署。但对于正在学习hadoop的朋友来说,hadoop运行环境部署真的是件非常头疼的事情。 计划在接下来的一段时间里,以我之前学习研究大快搜索DKHadoop时经验为基础,给大家整理分享虚拟机的安装、操作系统安装、服·务·器操作系统配置、DKH系统安装等相关内容。文章篇幅可能会比较长,所以每篇就只分享一到两个点。本篇给大家分享DKHadoop虚拟机的环境安装。
虚拟机的安装
1、下载的链接地址可参考: https://download3.vmware.com/software/wkst/file/VMware-workstation-full-12.5.7-5813279.exe
2、详细安装步骤: (1)在配置类型选项中,我们选择自定义(高级),选择好之后点击“下一步”
(2)虚拟机的硬件兼容性设置页面,参考下图设置好之后点击“下一步”;
(3)在安装客户机操作系统界面我们选择最后一个“稍后安装操作系统”(见图3-1),然后点击下一步,在客户机操作系统中选择“Linux”(见图3-2)。 图3-1 图3-2
(4)为虚拟机命名,如下图4-1,(命名按照自己喜欢即可),点击“下一步”进行处理器配置。见图4-2 图4-1 图4-2
(5)设置虚拟机的内存,见下图
(6)网络连接类型有四种,在四种网络连接类型中选择“使用桥连接网络”,点击“下一步”;
(7)SCSI控制器类型选择LSI Logic 的推荐类型即可;
(8)磁盘类型同样选择推荐的SCSI;
(9)选择创建新虚拟磁盘(如图9-1),点击下一步设置磁盘容量,详细见图9-2 图9-1 图9-2
(10)设置磁盘存储位置
(11)按照上述步骤完成设置后,单击下图中的“完成”即可,至此虚拟机安装成功。
大数据
2018-07-28 13:06:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
#前言
今天被问题卡了半天,最后察觉会不会datanode访问不了呢?在团哥的指导下...才发现自己犯了个愚蠢的错误,哎,不说罢。丢人,太粗心了。经过这几天,终于把Hadoop 3.0.3 和 Hive 3.0 搭建好了..不容易啊,搭建好了之后如何使用呢?木知....

#上码 1,下载,解压,变名 wget http://mirrors.hust.edu.cn/apache/hive/hive-3.0.0/apache-hive-3.0.0-bin.tar.gz tar -xzvf apache-hive-3.0.0-bin.tar.gz mv apache-hive-3.0.0-bin hive 2,配置环境 vim /etc/profile (在后面追加) export HIVE_HOME=/home/hive 3,安装Mysql sudo apt-get install mysql-server sudo apt-get install libmysql-java ln -s /usr/share/java/mysql-connector-java.jar $HIVE_HOME/lib/mysql-connector-java.jar 4,导入数据 $ mysql -u root -p mysql> CREATE DATABASE metastore; mysql> USE metastore; mysql> SOURCE $HIVE_HOME/scripts/metastore/upgrade/mysql/hive-schema-3.0.0.mysql.sql; mysql> CREATE USER 'hive'@'%' IDENTIFIED BY 'hive'; mysql> GRANT all on *.* to 'hive'@localhost identified by 'hive'; mysql> flush privileges; 5,配置hive 环境 (/home/hive/conf) cp hive-env.sh.template hive-env.sh vim hive-env.sh export HADOOP_HOME=/home/hadoop export HIVE_CONF_DIR=/home/hive/conf cp hive-default.xml.template hive-site.xml vim hive-site.xml (配置路径与mysql) system:java.io.tmpdir /user/hive/warehouse system:user.name ${user.name} hive.metastore.db.type mysql javax.jdo.option.ConnectionURL jdbc:mysql://localhost:3306/metastore?createDatabaseIfNotExist=true javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver javax.jdo.option.ConnectionUserName hive user name for connecting to mysql server javax.jdo.option.ConnectionPassword hive password for connecting to mysql server 6, 创建临时目录 $HADOOP_HOME/bin/hadoop fs -mkdir -p /tmp $HADOOP_HOME/bin/hadoop fs -mkdir -p /user/hive/warehouse $HADOOP_HOME/bin/hadoop fs -chmod g+w /tmp $HADOOP_HOME/bin/hadoop fs -chmod g+w /user/hive/warehouse 7,初始化hive schematool -dbType mysql -initSchema 8,启动 metastore服务 (不启用会报:HiveException java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient。) ./hive --service metastore & 9,进入Hive $HIVE_HOME/bin/hive #创建表 hive (default)> CREATE TABLE IF NOT EXISTS test_table (col1 int COMMENT 'Integer Column', col2 string COMMENT 'String Column' ) COMMENT 'This is test table' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE; hive (default)> show tables; tab_name test_table #写入 hive (default)> insert into test_table values(1,'aaa'); MapReduce Jobs Launched: Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 5.54 sec HDFS Read: 15408 HDFS Write: 243 SUCCESS Total MapReduce CPU Time Spent: 5 seconds 540 msec OK col1 col2 Time taken: 26.271 seconds #查询 hive (default)> select * from test_table; test_table.col1 test_table.col2 2 bbb 3 ccc 4 ddd Time taken: 0.205 seconds, Fetched: 3 row(s) 10,had001 jps root@had001:/home/hive# jps 6675 SecondaryNameNode 6426 NameNode 6908 ResourceManager 8382 Jps 11,had002,had003 jps root@had002:~# jps 3300 DataNode 3430 NodeManager 5610 Jps #查看是否能连接had001 root@had002:~# /home/hadoop/bin/hdfs dfsadmin -report root@had003:~# /home/hadoop/bin/hdfs dfsadmin -report #正常有data目录 root@had002:~# tree /usr/local/hadoop/tmp /usr/local/hadoop/tmp ├── dfs │ └── data │ ├── current │ │ ├── BP-1834162669-172.17.252.52-1532682436448 │ │ │ ├── current │ │ │ │ ├── finalized 12,错误 1,Exception in thread "main" java.lang.RuntimeException: com.ctc.wstx.exc.WstxParsingException: Illegal character entity: expansion character (code 0x8 at [row,col,system-id]: [3213,96,"file:/home/appleyuchi/apache-hive-3.0.0-bin/conf/hive-site.xml"] 解决: /home/appleyuchi/apache-hive-3.0.0-bin/conf/hive-site.xml 上面的第3213行,第96个字符是非法字符,注释掉就行了 2,hadoop cluder could only be written to 0 of the 1 minReplication nodes 原因是had002,had003连不了had001

参考
https://dzone.com/articles/how-configure-mysql-metastore
http://dwgeek.com/hive-create-table-command-examples.html/
https://blog.csdn.net/yuyanhsf/article/details/81000522
您有什么不同的意见或看法? 欢迎留言共同学习,谢谢。
本文链接: http://www.hihubs.com/article/343
关键字:Hadoop 3.0.3 + Hive3.0安装
若无特别注明,文章皆为 Hubs'm 原创,转载请注明出处...O(∩_∩)O
大数据
2018-07-27 21:06:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
Error Message org.springframework.beans.factory.NoUniqueBeanDefinitionException: No qualifying bean of type 'javax.jms.ConnectionFactory' available: expected single matching bean but found 2: activeMQConnectionFactory,springFactory
今天使用springboot Demo ActiveMQ过程中,注入 ConnectionFactory 时,SpringBoot 自动配置时发现因为多个候选bean注入失败,解决此问题有两个方法: // 1. 当bean定义阶段和bean注入阶段代码都可以修改时优先采用@Qualifier注解在bean注入阶段解决,使用之前需要先获得bean的名称; @Autowired @Qualifier(value = "activeMQConnectionFactory") public ActiveMQConnectionFactory mqConnectionFactory; @Bean(name = "activeMQConnectionFactory") public ActiveMQConnectionFactory activeMQConnectionFactory(){ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(); activeMQConnectionFactory.setBrokerURL(BROKERURL); activeMQConnectionFactory.setUserName(USERNAME); activeMQConnectionFactory.setPassword(PASSWORD); activeMQConnectionFactory.setTrustAllPackages(true); return activeMQConnectionFactory; } @Bean(name = "springFactory") // @Primary public CachingConnectionFactory connectionFactory(){ CachingConnectionFactory factory = new CachingConnectionFactory(mqConnectionFactory); factory.setSessionCacheSize(100); factory.setCacheConsumers(true); factory.setCacheProducers(true); return factory; } // 2. 但遇到像springboot自动配置这种情况,无法改动@AutoWired阶段的代码,所以只能采取在@Bean阶段加上注解配置@primary @Bean(name = "springFactory") @Primary // 默认优先调用该bean public CachingConnectionFactory connectionFactory(){ CachingConnectionFactory factory = new CachingConnectionFactory(mqConnectionFactory); factory.setSessionCacheSize(100); factory.setCacheConsumers(true); factory.setCacheProducers(true); return factory; }
大数据
2018-07-27 16:30:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
####错误如下 org.springframework.jms.support.converter.MessageConversionException: Could not convert JMS message; nested exception is javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Forbidden class com.mercury.mq.service.MyMessage! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
解决方案 //工厂方法中设置setTrustAllPackages(),true表示信任所有package activeMQConnectionFactory.setTrustAllPackages(true); // 另外也可以设置信任列表 factory.setTrustedPackages(new ArrayList(Arrays.asList("org.apache.activemq.test,org.apache.camel.test".split(","))));
大数据
2018-07-27 11:06:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
BinderHub 安装指南
BinderHub uses Helm Charts to set up the applications we’ll use in our Binder deployment. If you’re curious about what Helm Charts are and how they’re used here, see the Zero to JupyterHub guide .
Below we’ll cover how to configure your Helm Chart, and how to create your BinderHub deployment.
3.1. Preparing to install
To configure the Helm Chart we’ll need to generate several pieces of information and insert them into yaml files.
First we’ll create a folder where we’ll store our BinderHub configuration files. You can do so with the following commands: mkdir binderhub cd binderhub
Now we’ll collect the information we need to deploy our BinderHub. The first is the content of the JSON file created when we set up the container registry. For more information on getting a registry password, see Set up the container registry . We’ll copy/paste the contents of this file in the steps below.
Create two random tokens by running the following commands then copying the outputs.: openssl rand -hex 32 openssl rand -hex 32
Note
This command is run twice because we need two different tokens.
3.2. Create secret.yaml file
Create a file called secret.yaml and add the following: jupyterhub: hub: services: binder: apiToken: "" proxy: secretToken: ""
Next, we’ll configure this file to connect with our registry.
3.2.1. If you are using gcr.io
Add the following section to secret.yaml . Note that the first line is not indented at all: registry: password: | { "type": "", "project_id": "", "private_key_id": "", "private_key": "", "client_email": "", "client_id": "", "auth_uri": "", "token_uri": "", "auth_provider_x509_cert_url": "", "client_x509_cert_url": "" }
Tip The content you put just after password: | must all line up at the same tab level. Don’t forget the | after the password: label.
3.2.2. If you are using Docker Hub
Update secret.yaml by entering the following: registry: username: password:
Note ```` and ```` are your credentials to login to Docker Hub. If you use an organization to store your Docker images, this account must be a member of it.
3.3. Create config.yaml
Create a file called config.yaml and choose the following directions based on the registry you are using.
3.3.1. If you are using gcr.io
To configure BinderHub to use gcr.io , simply add the following to your config.yaml file: registry: prefix: gcr.io// enabled: true
Note ```` can be found in the JSON file that you pasted above. It is the text that is in the project_id field. This is the project ID , which may be different from the project name . ```` can be any string, and will be prepended to image names. We recommend something descriptive such as binder-dev or binder-prod .
3.3.2. If you are using Docker Hub
Using Docker Hub is slightly more involved as the registry is not being run by the same platform that runs BinderHub.
Update config.yaml by entering the following: registry: enabled: true prefix: / host: https://registry.hub.docker.com authHost: https://index.docker.io/v1 authTokenUrl: https://auth.docker.io/token?service=registry.docker.io
Note ```` is where you want to store Docker images. This can be your Docker ID account or an organization that your account belongs to. ```` can be any string, and will be prepended to image names. We recommend something descriptive such as binder-dev or binder-prod .
3.4. Install BinderHub
First, get the latest helm chart for BinderHub.: helm repo add jupyterhub https://jupyterhub.github.io/helm-chart helm repo update
Next, install the Helm Chart using the configuration files that you’ve just created. Do this by running the following command: helm install jupyterhub/binderhub --version=v0.1.0-85ac189 --name= --namespace= -f secret.yaml -f config.yaml
Note --version refers to the version of the BinderHub Helm Chart . name and namespace may be different, but we recommend using the same name and namespace to avoid confusion. We recommend something descriptive and short, such as binder . If you run kubectl get pod --namespace= you may notice the binder pod in CrashLoopBackoff . This is expected, and will be resolved in the next section.
This installation step will deploy both a BinderHub and a JupyterHub, but they are not yet set up to communicate with each other. We’ll fix this in the next step. Wait a few moments before moving on as the resources may take a few minutes to be set up.
3.5. Connect BinderHub and JupyterHub
In the google console, run the following command to print the IP address of the JupyterHub we just deployed.: kubectl --namespace= get svc proxy-public
Copy the IP address under EXTERNAL-IP . This is the IP of your JupyterHub. Now, add the following lines to config.yaml file: hub: url: http://
Next, upgrade the helm chart to deploy this change: helm upgrade jupyterhub/binderhub --version=v0.1.0-85ac189 -f secret.yaml -f config.yaml
3.6. Try out your BinderHub Deployment
If the helm upgrade command above succeeds, it’s time to try out your BinderHub deployment.
First, find the IP address of the BinderHub deployment by running the following command: kubectl --namespace= get svc binder
Note the IP address in EXTERNAL-IP . This is your BinderHub IP address. Type this IP address in your browser and a BinderHub should be waiting there for you.
You now have a functioning BinderHub at the above IP address.
3.7. Increase your GitHub API limit
Note
Increasing the GitHub API limit is not strictly required, but is recommended before sharing your BinderHub URL with users.
By default GitHub only lets you make 60 requests each hour. If you expect your users to serve repositories hosted on GitHub, we recommend creating an API access token to raise your API limit to 5000 requests an hour. Create a new token with default (check no boxes) permissions here . Store your new token somewhere secure (e.g. keychain, netrc, etc.) Before running your BinderHub server, run the following: export GITHUB_ACCESS_TOKEN=
BinderHub will automatically use the token stored in this variable when making API requests to GitHub. See the GitHub authentication documentation for more information about API limits.
For next steps, see Debugging BinderHub and Tear down your Binder deployment .
大数据
2018-07-26 16:39:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
测试数据: Java编程基础 88 Java高级应用 99
Maven依赖 dom4j dom4j 1.6.1 jaxen jaxen 1.1.6
测试代码: import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; import org.dom4j.io.SAXReader; import org.junit.Test; import java.io.File; import java.util.List; /** * Created by Administrator on 2018/7/7 17:14 in Beijing. */ public class Dom4jTest { @Test public void demo1() throws DocumentException { SAXReader saxReader = new SAXReader(); Document document = saxReader.read(new File("./src/main/resources/books.xml")); //得到根元素 Element rootElement = document.getRootElement(); List bookList1 = rootElement.elements(); for (Element book : bookList1) { System.out.println(book.element("name").getText() + " ---- "+ book.element("price").getText()); } System.out.println(); System.out.println(rootElement.selectSingleNode("//name").getText()); //获取的是第一个 System.out.println(); //使用Xpath方式 List bookList2 = rootElement.selectNodes("//book"); for (Element book : bookList2) { System.out.println(book.element("name").getText() + " ---- "+ book.element("price").getText()); } } }
输出: Java编程基础 ---- 88 Java高级应用 ---- 99 Java编程基础 Java编程基础 ---- 88 Java高级应用 ---- 99
大数据
2018-07-26 16:33:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
先简单介绍下HS1和HS2的主要区别:
HiveServer1:
可以看到HS的进程和MetaStore的进程是在一个虚拟机里面的,而且从图中可以看出,一个HS服务同时只能提供一个访问连接。

HiveServer2:
HS2的主要改进是把MetaStoreServer从Hiveserver中剥离出来了,形成一个单独的进程运行,而且hiveserver和metastore server可以同时服务 于多个客户端(Beeline CLI,Hive CLI,HCatalog等)。


配置Hiveserver2的访问协议,http或者tcp

hive.server2.transport.mode
binary
Server transport mode. "binary" or "http" .


对应http协议的访问端口

hive.server2.thrift.http.port
10001
Port number when in HTTP mode.


对应tcp协议的访问端口

hive.server2.thrift.port
10001
Port number when in TCP mode.


对应在http协议下线程池的线程数

hive.server2.thrift.http.min.worker.threads
5
Minimum number of worker threads when in HTTP mode.


hive.server2.thrift.http.max.worker.threads
500
Maximum number of worker threads when in HTTP mode.


对应在tcp协议下的线程池的线程数:

hive.server2.thrift.min.worker.threads
5
Minimum number of worker threads when in TCP mode.


hive.server2.thrift.max.worker.threads
500
Maximum number of worker threads when in TCP mode.


配置metastore server的地址

hive.metastore.uris

Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.


配置异步线程池的线程数

hive.server2.async.exec.threads
100
Number of threads in the async thread pool for HiveServer2


配置异步线程结束的超时时间(超过这个时间HS会退出):

hive.server2.async.exec.shutdown.timeout
10
Time (in seconds) for which HiveServer2 shutdown will wait for async
threads to terminate



配置异步线程的等待超时时间(超过这个值线程会被回收)

hive.server2.async.exec.keepalive.time
10
Time (in seconds) that an idle HiveServer2 async thread (from the thread pool) will wait
for a new task to arrive before terminating




hive.server2. long .polling.timeout
5000L
Time in milliseconds that HiveServer2 will wait, before responding to asynchronous calls that use long polling


配置请求缓冲队列的长度

hive.server2.async.exec.wait.queue.size
100
Size of the wait queue for async thread pool in HiveServer2.
After hitting this limit, the async thread pool will reject new requests.


大数据
2018-07-23 16:43:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
CDH的坑之Sqoop导出数据到MySQL
最近使用Sqoop从Hive导出数据到MySQL中,出现了一系列的问题,下面将这个问题记录一下,避免再度踩坑!
导出语句 sqoop export --connect jdbc:mysql://192.168.1.78:3306/data \ --username root \ -P \ --export-dir '/user/hive/warehouse/personas.db/user_attribute/000000_0' \ --table dm_user_attribute \ --input-fields-terminated-by '|' \ --input-null-non-string '\\N' \ --input-null-string '\\N' \ --lines-terminated-by '\n' \ -m 1
运行环境
centOS7+CDH5.7.2+其中集成的Sqoop
错误信息
以下是我输入命令到服务器中,控制台打印的信息。 Warning: /opt/cloudera/parcels/CDH-5.7.2-1.cdh5.7.2.p0.18/bin/../lib/sqoop/../accumulo does not exist! Accumulo imports will fail. Please set $ACCUMULO_HOME to the root of your Accumulo installation. 18/07/23 11:54:45 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.7.2 18/07/23 11:54:45 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead. 18/07/23 11:54:45 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset. 18/07/23 11:54:45 INFO tool.CodeGenTool: Beginning code generation 18/07/23 11:54:45 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `dm_user_attribute` AS t LIMIT 1 18/07/23 11:54:45 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `dm_user_attribute` AS t LIMIT 1 18/07/23 11:54:45 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce Note: /tmp/sqoop-root/compile/2322b82e8ef7190a66357528d5fbddae/dm_user_attribute.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 18/07/23 11:54:47 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-root/compile/2322b82e8ef7190a66357528d5fbddae/dm_user_attribute.jar 18/07/23 11:54:47 INFO mapreduce.ExportJobBase: Beginning export of dm_user_attribute 18/07/23 11:54:47 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar 18/07/23 11:54:47 INFO Configuration.deprecation: mapred.map.max.attempts is deprecated. Instead, use mapreduce.map.maxattempts 18/07/23 11:54:48 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative 18/07/23 11:54:48 INFO Configuration.deprecation: mapred.map.tasks.speculative.execution is deprecated. Instead, use mapreduce.map.speculative 18/07/23 11:54:48 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps 18/07/23 11:54:48 INFO client.RMProxy: Connecting to ResourceManager at 192.168.1.152:8032 18/07/23 11:54:49 INFO input.FileInputFormat: Total input paths to process : 1 18/07/23 11:54:49 INFO input.FileInputFormat: Total input paths to process : 1 18/07/23 11:54:49 INFO mapreduce.JobSubmitter: number of splits:1 18/07/23 11:54:49 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1528444677205_1338 18/07/23 11:54:50 INFO impl.YarnClientImpl: Submitted application application_1528444677205_1338 18/07/23 11:54:50 INFO mapreduce.Job: The url to track the job: http://daojia02:8088/proxy/application_1528444677205_1338/ 18/07/23 11:54:50 INFO mapreduce.Job: Running job: job_1528444677205_1338 18/07/23 11:54:55 INFO mapreduce.Job: Job job_1528444677205_1338 running in uber mode : false 18/07/23 11:54:55 INFO mapreduce.Job: map 0% reduce 0% 18/07/23 11:55:00 INFO mapreduce.Job: map 100% reduce 0% 18/07/23 11:55:01 INFO mapreduce.Job: Job job_1528444677205_1338 failed with state FAILED due to: Task failed task_1528444677205_1338_m_000000 Job failed as tasks failed. failedMaps:1 failedReduces:0 18/07/23 11:55:01 INFO mapreduce.Job: Counters: 8 Job Counters Failed map tasks=1 Launched map tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=2855 Total time spent by all reduces in occupied slots (ms)=0 Total time spent by all map tasks (ms)=2855 Total vcore-seconds taken by all map tasks=2855 Total megabyte-seconds taken by all map tasks=2923520 18/07/23 11:55:01 WARN mapreduce.Counters: Group FileSystemCounters is deprecated. Use org.apache.hadoop.mapreduce.FileSystemCounter instead 18/07/23 11:55:01 INFO mapreduce.ExportJobBase: Transferred 0 bytes in 13.576 seconds (0 bytes/sec) 18/07/23 11:55:01 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead 18/07/23 11:55:01 INFO mapreduce.ExportJobBase: Exported 0 records. 18/07/23 11:55:01 ERROR tool.ExportTool: Error during export: Export job failed!
当我看到这个控制台打印的信息时,犹如一万只草泥马狂奔而过,这是什么鬼?只告诉你导出失败,任务中断了,错误信息呢?你看到是不是也是一样的感觉呢?这该如何解决?从何入手呢?
Sqoop的错误日志
经过两天的各种搞头,最后终于知道了如何解决这个问题,这个问题不是具体的问题,但是想要知道具体的错误信息,在控制台是看不到的,只能到CDH的web管理界面去看,如下就告诉大家CDH的管理界面怎么找到Sqoop的这个任务日志。
第一步
如下图:点击YAEN进入YARN的详情界面。有人会问,为什么不是Sqoop的界面,Sqoop最终会转化为MR进行任务的执行,所以这里要看Sqoop的任务执行情况,还是要到YARN的详情界面去看。
第二步
如下图为YARN的详情界面,需要点击应用程序目录,进入任务的执行结果列表中,可以看到各个执行的任务,以及执行的结果,下图明显看到有一个错误。根据如下的操作进入下一个页面。
第三步
这个界面展示了单个任务的还算详细的任务信息,不过这不是我们最终要找的界面,看到如下图框起来的logs超链接字段,点击进入下一个页面。
第四步
看到这个界面,好像是找到了日志的界面,对不起,还没有,向下拉,你会看到如图的字样,这个页面只是展示了任务执行的流程,具体的错误信息还在另外一个页面。点击如图here超链接的字样,进入下一个页面。
第五步
经过前面的几个页面,我们终于进入了我们想要看到的页面,我们亲爱的错误页面,在这里,就可以看到这个任务的错误原因,这样就可以根据错误信息解决问题了。这个页面展示的错误信息的解决方法,网上基本都有,可以根据错误信息自行查找了。
本人这里展现的问题,是因为Hive和MySQL的时间字段不匹配导致的,这里更改MySQL或者Hive的时间字段类型,让两边的类型保持一致,即可解决问题。
真的没想到,CDH会这么坑,这个问题,整整折磨了我两天,不过还好,最终还是解决了,以后再遇到之后,就会可以立即解决了。
上一篇: Centos 7+CDH5.7.2全部署流程
下一篇: CDH的坑之Deploy Client Configuration Failed
大数据
2018-07-23 16:03:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
Sqoop使用手册
转载请注明出处 : http://www.cnblogs.com/xiaodf/ 1 Sqoop概述 2 版本说明 3 驱动安装 3.1 MySQL 4 基本用法 4.1 导入 4.1.1 保护密码 4.1.2 使用其他文件格式 4.1.3 压缩导入的数据 4.1.4 提高传输速度 4.1.5 自定义类型映射 4.1.6 并行控制 4.1.7 对NULL值进行编码 4.1.8 导入所有表 4.2 增量导入 4.2.1 只导入细腻数据 4.2.2 增量导入可变数据 4.2.3 保存last-value 4.2.4 在metastore中保存密码 4.2.5 客户端之间共享metastore 4.3 导出 4.3.1 hive导出数据到mysql 4.3.2 批量插入 4.3.3 导出所有数据或不导出任何数据 4.3.4 更新已有数据 4.3.5 更新或插入数据 4.3.6 只导出某些列 4.3.7 编码NULL值 5 附:import和export参数详解 5.1 import和export工具通用选项 5.2 数据导入工具import参数详解 5.3 数据导出工具export参数详解 5.4 Sqoop Job参数详解

1 Sqoop概述
Sqoop是一个旨在Hadoop和关系数据库或主机之间传输数据的工具。你可以使用Sqoop从关系型数据库管理系统(RDBMS),如MySQL、Oracle或大型机到Hadoop分布式文件系统(HDFS)导入数据,在Hadoop中的MapReduce转换数据,然后将数据导出到一个RDBMS 。Sqoop使用MapReduce导入和导出数据,它提供了并行操作,以及容错。

2 版本说明
本文档适用于Sqoop1。
软件 版本
Sqoop 1.4.5-cdh5.3.0
Mysql JDBC驱动
ORACLE JDBC驱动
5.1.32
和oracle版本保持一致

3 驱动安装

3.1 MySQL
将驱动对应的jar包放置到 /usr/share/java/下。
如:/usr/share/java/mysql-connector-java-5.1.32-bin.jar
4 基本用法

4.1 导入 sqoop import \ --connect JDBC_URL \ --username USERNAME \ --password PASSWORD \ --table TABLE \ --hive-import \ --split-by SPLIT_BY_COL \ --num-mappers N \ --hive-database HIVE_DB_NAME \ --hive-table HIVE_TABLE_NAME \ (--hive-partition-key partition_name \ --hive-partition-value partititon_value \ 附:如果是分区表则需指出分区名称和分区值) --columns col1,clo2,col3… \ --warehouse-dir /user/hive/warehouse/ \ --fields-terminated-by ‘|’ \ --direct \ --compress
参数说明
参数 说明
--connect 连接RDBMS的jdbc连接字符串。
示例:--connect jdbc:mysql:// MYSQL_SERVER: PORT / DBNAME
其中:mysql默认端口号为3306;
示例:--connect jdbc:oracle:thin: USERNAME/PASSWORD@ORACLE_SERVER :PORT: SID
其中:Oracle默认端口号为1521;
thin:是驱动方式,“瘦”的意思,直接使用原生的Oracle JDBC驱动;
SID:是一个数据库的唯一标识符,是建立一个数据库时系统自动赋予的一个初始ID。
--username 连接RDBMS所使用的用户名。
--password 连接RDBMS所使用的密码。
--table 将要导入到hive的表。
--split-by 分割导入任务所使用的字段。需要明确指定,推荐使用主键。
--hive-import 插入数据到hive当中,使用hive默认的分隔符。
-m, --num-mappers< n> 使用n个map任务并行导入数据。是指生成的map任务的总数量,
不是同时处于RUNNING状态的数量。
--hive-database hive当中的数据库。
-- hive-table hive当中的表名
--hive-partition-key hive分区的列名 。
--hive-partition-value hive分区的值。
--columns < col,col,col…> 从表中导出指定的一组列的数据,用逗号分隔,
oracle中列名需要大写。
--warehouse-dir (必选)可以指定为-warehouse-dir/user/hive/warehouse/
即导入数据的存放路径,如果该路径不存在,会首先创建。
在该路径下会生成和TABLE(--table)同名的文件夹,该文件夹下存放数据文件。
如果路径存在,需要保证该文件夹下不存在与TABLE(--table)同名文件。
如果不手动指定,默认是启动sqoop任务的用户的home目录。
--direct 使用快速模式导入
--compress 启用压缩,生成的文件为经过压缩的文件。
默认使用GZIP算法。
通过--compression-codec设置压缩算法。
通常当空间不够时可以使用压缩,不过需要注意,如果压缩率过大可能导致CPU占用过高。
如果可以,推荐使用snappy。
另外,如果配置了mapreduce的“map输出压缩”,
那么即使不适用—compress开关,
导入的数据文件也会使用对应的codec进行压缩。
--compression-codec
其他
使用Hadoop Codec。(默认gzip)前提是设置了—cpmpress。
可使用 sqoop import 命令查看帮助。
附:对于--warehouse-dir需要指定为/user/hive/warehouse/但在该路径下不能存在与TABLE(--table)同名的文件,否则导入失败。当导入成功时,会在该路径下生成数据文件part-m-XXXXX并且生成与TABLE(--table)同名文件,存放导入成功的标志文件_SUCCESS。

4.1.1 保护密码
在sqoop命令中显式指定密码会是很不安全的操作,使用操作系统的列出正在执行的命令的方式可以很容易的获取到密码。有两种方式可以解决这个问题。
方式一:使用-P(大写)参数,在执行命令时再输入密码。
方式二:使用--password-file参数,即将密码存放在参数指定的文件中。
4.1.2 使用其他文件格式
Sqoop支持3中不同的文件格式,其中一种是文本格式,两外两种是二进制格式。二进制格式分别是Avro和SequenceFile。使用--as-avrodatafile或--as-sequencefile以指定具体使用哪种二进制格式。
4.1.3 压缩导入的数据
使用—compress或-z参数以压缩导入之后的数据。默认的压缩算法为GZip,所有文件的后缀名均为.gz。可以使用—compress-codec来指定其他的codec。如 --compression-codec org.apache.hadoop.io.compress.BZip2Codec
使用压缩需要将mapreduce中对应的设置项开启,如mapreduce.output.
compress。
4.1.4 提高传输速度
不同于JDBC接口,direct模式下使用数据库提供的本地工具进行数据传输。在MySQL中使用mysqldump和mysqlimport。对于PostgreSQL,sqoop会使用pg_dump工具来导入数据。使用本地工具会极大提高性能,因为他们针对数据传输做了优化,以降低数据库服务器的负担。当然也有很多限制,比如并不是所有的数据库都提供本地工具。目前sqoop的direct模式只支持MySQL和PostgreSQL。
4.1.5 自定义类型映射
使用—amp-column-java参数来将列列映射到java类以覆盖sqoop提供的默认的映射关系。
如要将c1、c2、c3分别映射为Float、String、String,对应的设置如下所示。 sqoop import --map-column-java c1=Float,c2=String,c3=String ...

4.1.6 并行控制
Sqoop默认使用4个并发的map任务来项hadoop传输数据。当数据量比较大时可以考虑增加并发执行的map任务的数量以提高传输速度。使用参数—num-mappers来控制map任务的数量。
4.1.7 对NULL值进行编码
Sqoop使用“null”字符串来代替数据库中的NULL值。对于文本类型的列,使用—null-string来设置替代NULL值得字符串,对于其他类型的列,则使用—null-non-string来设置。
如果想使用\N来编码NULL值,则对应sqoop命令中的值为\N, \在JAVA中是转义字符。 --null-string '\\N' \ --null-non-string '\\N'

4.1.8 导入所有表
使用如下命令导入所有表。sqoop会一次导入每张表,以避免对数据库服务器造成额外的负担。 sqoop import-all-tables \ --connect jdbc:mysql://mysql.example.com/sqoop \ --username sqoop \ --password sqoop

4.2 增量导入
增量导入是每次只导入新增加的或者需要更新的数据。增量导入会大大降低数据库服务器的负担。
4.2.1 只导入细腻数据
假设我们有INTEGER类型的主键,并且只追加新列,并且需要定期将表的状态同步到Hadoop中。我们需要使用增量导入的功能。典型代码如下所示。 sqoop import \ --connect jdbc:mysql://mysql.example.com/sqoop \ --username sqoop \ --password sqoop \ --table visits \ --incremental append \ --check-column id \ --last-value 1
--incremental参数表示使用增量导入,参数值为使用的增量导入的类型。,由于我们只添加而不修改列的值,所以使用append。增量导入中还需要额外的两个参数:--check-column指定一个列,用于检查新增加的数据,--last-value包含上次成功导入到Hadoop的值。
4.2.2 增量导入可变数据
使用lastmodified模式而不是append模式来导入变化的数据。例如使用如下命令导入所last_update_date列大于“2013-05-22 01:01:01”的行。 sqoop import \ --connect jdbc:mysql://mysql.example.com/sqoop \ --username sqoop \ --password sqoop \ --table visits \ --incremental lastmodified \ --check-column last_update_date \ --last-value "2013-05-22 01:01:01"
增量导入模式lastmodified需要一个日期(可选的类型有date、time、datetime和timestamp)列来保存列被修改的时间。
注:--last-value的值在增量导入时是包含在需要导入的范围之内的。
Sqoop不会检查数据是否重复,即不会按照MySQL中那样更新数据。
4.2.3 保存last-value
Sqoop导入任务完成后会给出新的last-value,我们可以保存当前下来,以便下次使用。
Sqoop的metastore会保存任务的信息,所以我们创建对应的任务即可。 sqoop job \ --create visits \ -- \ import \ --connect jdbc:mysql://mysql.example.com/sqoop \ --username sqoop \ --password sqoop \ --table visits \ --incremental append \ --check-column id \ --last-value 0
执行任务 sqoop job --exec visits
删除任务 sqoop job --delete visits
查看任务信息,信息中会包含last-value的当前值。 sqoop job --show visits

4.2.4 在metastore中保存密码
很不幸,每次使用sqoop job执行任务都需要手动输入密码。
解决方式有两种:
第一种方式,使用password-file(“导入”一章中有介绍);
第二种方式,在sqoop-site.xml中添加如下属性即可(添加后第一次仍然需要输入密码 )。 sqoop.metastore.client.record.password true

4.2.5 客户端之间共享metastore
启动metastore服务 sqoop metastore
客户端连接到metastore服务 sqoop job --create visits \ --meta-connect \ jdbc:hsqldb:hsql://metastore.example.com:16000/sqoop \ -- \ import \ --table visits ...
显示任务 sqoop job --list --meta-connect jdbc:hsqldb:hsql://metastore.example.com:16000/sqoop

4.3 导出

4.3.1 hive导出数据到mysql sqoop export --connect jdbc:mysql://MYSQL_SERVER:PORT/DBNAME \ --username USERNAME \ --table TABLE \ --export-dir /user/hive/warehouse/HIVE_TABLE_NAME/ \ --num-mappers N \ --fields-terminated-by ','
附:对于hive中有null时,导入到MySQL中依然是NULL。
4.3.2 批量插入
sqoop使用独立的insert语句来添加每一行记录,使用如下的方式可以每次插入多条记录。即批量插入,每次插入10条。 sqoop export \ -Dsqoop.export.records.per.statement=10 \ --connect jdbc:mysql://mysql.example.com/sqoop \ --username sqoop \ --password sqoop \ --table cities \ --export-dir cities \ --batch

4.3.3 导出所有数据或不导出任何数据
我们需要确保或者所有数据均导出成功,或者不导出任何数据。为此我们使用临时表,即先将数据导出到临时表(staging-table)中,然后再转移到目标表中。 sqoop export \ --connect jdbc:mysql://mysql.example.com/sqoop \ --username sqoop \ --password sqoop \ --table cities \ --staging-table staging_cities

4.3.4 更新已有数据
使用—update-key参数指定一列,该列可以识别出被修改的行,通常是表中的主键。例如下面的示例中使用列id来更新cities表。 sqoop export \ --connect jdbc:mysql://mysql.example.com/sqoop \ --username sqoop \ --password sqoop \ --table cities \ --update-key id
可以指定多个列,列之间用逗号隔开。
注意,该操作只会更新已有的数据,不会插入新的数据,
4.3.5 更新或插入数据
使用如下的参数可以同时插入新数据或更新已有数据。 sqoop export \ --connect jdbc:mysql://mysql.example.com/sqoop \ --username sqoop \ --password sqoop \ --table cities \ --update-key id \ --update-mode allowinsert

4.3.6 只导出某些列
Sqoop默认hdfs中的数据和数据库中有相同数量的列并且顺序相同,使用—columns参数可以指定需要导出到数据库中的列或者指定导出列之间的顺序。如只导出coutry和city列,就可以使用如下示例。 sqoop export \ --connect jdbc:mysql://mysql.example.com/sqoop \ --username sqoop \ --password sqoop \ --table cities \ --columns country,city

4.3.7 编码NULL值
你可以使用—input-null-string和—input-null-no-string参数来覆盖NULL值的替换字符串,例如使用‘\N’替换NULL值。 sqoop export \ --connect jdbc:mysql://mysql.example.com/sqoop \ --username sqoop \ --password sqoop \ --table cities \ --input-null-string '\\N' \ --input-null-non-string '\\N'

5 附:import和export参数详解

5.1 import和export工具通用选项
选项 说明
--connect < jdbc-uri > 指定JDBC连接字符串
--connection-manager < class-name > 指定要使用的连接管理器类
--driver < class-name > 指定要使用的JDBC驱动类
--hadoop-mapred-home < dir > 指定$HADOOP_MAPRED_HOME路径
--help 打印用法帮助信息
--password-file 设置用于存放认证的密码信息文件的路径
-P 从控制台读取输入的密码
--password < password > 设置认证密码
--username < username > 设置认证用户名
--verbose
--connection-param-file < filename >
打印详细的运行信息
可选,指定存储数据库连接参数的属性文件

5.2 数据导入工具import参数详解
import工具,是将HDFS平台外部的结构化存储系统中的数据导入到Hadoop平台,便于后续分析。我们先看一下import工具的基本选项及其含义,如下表所示:
选项 说明
--append 将数据追加到HDFS上一个已存在的数据集上
--as-avrodatafile 将数据导入到Avro数据文件
--as-sequencefile 将数据导入到SequenceFile
--as-textfile 将数据导入到普通文本文件(默认)
--boundary-query < statement > 边界查询,用于创建分片(InputSplit)
--columns < col,col,col…> 从表中导出指定的一组列的数据
--delete-target-dir 如果指定目录存在,则先删除掉
--direct 使用直接导入模式(优化导入速度)
--direct-split-size < n > 分割输入stream的字节大小(在直接导入模式下)
--fetch-size < n > 从数据库中批量读取记录数
--inline-lob-limit < n > 设置内联的LOB对象的大小
-m,--num-mappers < n > 使用n个map任务并行导入数据
-e,--query < statement > 导入的查询语句
--split-by < column-name > 指定按照哪个列去分割数据
--table < table-name > 导入的源表表名
--target-dir < dir > 导入HDFS的目标路径
--warehouse-dir < dir > HDFS存放表的根路径
--where < where clause> 指定导出时所使用的查询条件
-z,--compress 启用压缩
--compression-codec < c > 指定Hadoop的codec方式(默认gzip)
--null-string < null-string >
--null-non-string < null-string >
如果指定列为字符串类型,使用指定字符串替换值为null的该类列的值
如果指定列为非字符串类型,使用指定字符串替换值为null的该类列的值
hive参数
选项 说明
--hive-home < dir > Override $HIVE_HOME
--hive-import 插入数据到hive当中,使用hive的默认分隔符
--hive-overwrite 覆盖hive表中的数据
--create-hive-table 建表,如果表已经存在,该操作会报错
--hive-table < table-name > 设置到hive当中的表名
--hive-drop-import-delims 导入到hive时删除 \n, \r, and \01
--hive-delims-replacement 导入到hive时用自定义的字符替换掉\n, \r, and \01
--hive-partition-key hive分区的key
--hive-partition-value < v >
--map-column-hive < map >
hive分区的值
类型匹配,sql类型对应到hive类型
HBase参数
选项 说明
--column-family < family > 把内容导入到hbase当中,默认是用主键作为split列
--hbase-create-table 创建Hbase表
--hbase-row-key < col >
--hbase-table < table-name >
指定字段作为row key ,如果输入表包含复合主键,用逗号分隔
指定hbase表

5.3 数据导出工具export参数详解
export工具,是将HDFS平台的数据,导出到外部的结构化存储系统中,可能会为一些应用系统提供数据支持。我们看一下export工具的基本选项及其含义,如下表所示:
选项 说明
--validate < class-name > 启用数据副本验证功能,仅支持单表拷贝,可以指定验证使用的实现类
--validation-threshold < class-name > 指定验证门限所使用的类表
--direct 使用直接导出模式(优化速度)
--export-dir < dir > 导出过程中HDFS源路径
-m,--num-mappers < n > 使用n个map任务并行导出
--table < table-name > 导出的目的表名称
--call < stored-proc-name > 导出数据调用的指定存储过程名
--update-key col-name > 更新参考的列名称,多个列名使用逗号分隔
--update-mode < mode > 指定更新策略,包括:updateonly(默认)、allowinsert
--input-null-string < null-string > 使用指定字符串,替换字符串类型值为null的列
--input-null-non-string < null-string > 使用指定字符串,替换非字符串类型值为null的列
--staging-table < staging-table-name> 在数据导出到数据库之前,数据临时存放的表名称
--clear-staging-table
--batch
清除工作区中临时存放的数据
使用批量模式导出

5.4 Sqoop Job参数详解
选项 说明
--create < job-id > 定义sqoop job
--delete < job-id> 删除sqoop job
--exec < job-id> 执行sqoop job
--show < job-id>
--list
查看sqoop job状态
查看所有sqoop job
分类: Sqoop系列
大数据
2018-07-25 14:51:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
1.最大值 /** * 统计最大值 */ public static void max() { // 获取age字段的最大值 AggregationBuilder aggregation = AggregationBuilders.max("maxAge").field("age"); SearchResponse response = getClient().prepareSearch("my_person").addAggregation(aggregation).get(); Max max = response.getAggregations().get("maxAge"); System.out.println(max.getValue()); }
2.最小值 /** * 统计最小值 */ public static void min() { // 获取age字段的最小值 AggregationBuilder aggregation = AggregationBuilders.min("minAge").field("age"); SearchResponse response = getClient().prepareSearch("my_person").addAggregation(aggregation).get(); Min min = response.getAggregations().get("minAge"); System.out.println(min.getValue()); }
3.平均值 /** * 统计平均值 */ public static void avg() { // 获取age字段的平均值 AggregationBuilder aggregation = AggregationBuilders.avg("avgAge").field("age"); SearchResponse response = getClient().prepareSearch("my_person").addAggregation(aggregation).get(); Avg avg = response.getAggregations().get("avgAge"); System.out.println(avg.getValue()); }
4.求和统计 /** * 统计就和 */ public static void sum() { // 获取age字段的求和 AggregationBuilder aggregation = AggregationBuilders.sum("sumAge").field("age"); SearchResponse response = getClient().prepareSearch("my_person").addAggregation(aggregation).get(); Sum sum = response.getAggregations().get("sumAge"); System.out.println(sum.getValue()); }
5.基本统计 /** * 基本统计 */ public static void stats() { // 基本统计 AggregationBuilder aggregation = AggregationBuilders.stats("aggStats").field("age"); SearchResponse response = getClient().prepareSearch("my_person").addAggregation(aggregation).get(); Stats stats = response.getAggregations().get("aggStats"); System.out.println(stats.toString()); }
返回结果 {"aggStats":{"count":6,"min":20.0,"max":28.0,"avg":22.166666666666668,"sum":133.0}}
6.高级统计 /** * 高级统计 */ public static void extendedStats() { // 高级统计 AggregationBuilder aggregation = AggregationBuilders.extendedStats("exStats").field("age"); SearchResponse response = getClient().prepareSearch("my_person").addAggregation(aggregation).execute().actionGet(); ExtendedStats exStats = response.getAggregations().get("exStats"); System.out.println(exStats.toString()); }
执行结果 {"exStats":{"count":6,"min":20.0,"max":28.0,"avg":22.166666666666668,"sum":133.0,"sum_of_squares":2997.0,"variance":8.138888888888914,"std_deviation":2.8528737947706193,"std_deviation_bounds":{"upper":27.872414256207907,"lower":16.46091907712543}}}
7.基数统计 /** * 基数统计 */ public static void cardinality() { // 基数统计 AggregationBuilder aggregation = AggregationBuilders.cardinality("cardinality").field("age"); SearchResponse response = getClient().prepareSearch("my_person").addAggregation(aggregation).get(); Cardinality cardinality = response.getAggregations().get("cardinality"); System.out.println(cardinality.getValue()); }
8.文档数量统计 /** * 文档数量统计 */ public static void valueCount() { // 文档数量统计 AggregationBuilder aggregation = AggregationBuilders.count("valueCount").field("age"); SearchResponse response = getClient().prepareSearch("my_person").addAggregation(aggregation).execute().actionGet(); ValueCount valueCount = response.getAggregations().get("valueCount"); System.out.println(valueCount.getValue()); }
9.百分位统计 /** * 百分位统计 */ public static void percentiles() { // 百分位统计 AggregationBuilder aggregation = AggregationBuilders.percentiles("percentiles").field("age"); SearchResponse response = getClient().prepareSearch("my_person").addAggregation(aggregation).execute() .actionGet(); Percentiles percentiles = response.getAggregations().get("percentiles"); for (Percentile percentile : percentiles) { System.out.println("percent="+percentile.getPercent() + ",value=" + percentile.getValue()); } }
执行结果 percent=1.0,value=20.0 percent=5.0,value=20.0 percent=25.0,value=20.0 percent=50.0,value=21.0 percent=75.0,value=22.75 percent=95.0,value=26.75 percent=99.0,value=27.75
大数据
2018-07-24 21:45:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
1.constant_score 查询
2.bool 查询
多条件查询,当过个条件满足时返回文档,bool查询由一个或多个字句组成
occurrence 描述
must 返回的文档必须满足must子句的条件,并且参与计算分值
filter 【filter以前时单独的query DSL,现在归入bool query】;子句(查询)必须出现在匹配的文档中。然而,不同于must查询的是——它不参与分数计算。 Filter子句在过滤器上下文(filter context)中执行,这意味着score被忽略并且子句可缓存【所以filter可能更快】
should
must_not
“权限”比must/filter低。如果没有must或者filter,有一个或者多个should子句,那么只要满足一个就可以返回。minimum_should_match参数定义了至少满足几个子句。
返回的文档必须不满足must_not定义的条件
/** * 多条件查询 */ public static void boolQuery() { QueryBuilder query = QueryBuilders.boolQuery() .must(QueryBuilders.matchQuery("title", "董事"))//满足“董事”在“title”的分词集合中 .must(QueryBuilders.wildcardQuery("title", "*审*"))//满足“*审*”通配符匹配“title” .filter(QueryBuilders.matchQuery("author", "生物"));//满足“author”匹配“生物” SearchResponse response = getClient().prepareSearch("telegraph").setQuery(query).setSize(5).get(); SearchHits hits = response.getHits(); for (SearchHit searchHit : hits) { System.out.println(searchHit.getSourceAsString()); } }
测试 public static void main(String[] args) { boolQuery(); }
执行结果: { "title":"董事长、3名公司高管和2名中层人员被公安机关依法审查", "content":"财联社7月24日讯,长生生物公告称,董事长及部分高管无法正常履职,长春市长春新区公安分局对长春长生生产冻干人用狂犬病疫苗涉嫌违法犯罪案件立案调查,将主要涉案人员公司董事长、3名公司高管和2名中层人员带至公安机关依法审查", "author":"长生生物" }
3.dis_max查询
4.function_score查询
5.boosting查询
大数据
2018-07-24 20:31:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
Version :hadoop1.2.1; hbaes0.94.16;
HBase写入数据方式(参考:《HBase The Definitive Guide》),可以简单分为下面几种:
1. 直接使用HTable进行导入,代码如下: package hbase.curd; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Random; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; public class PutExample { /** * @param args * @throws IOException */ private HTable table = HTableUtil.getHTable("testtable"); public static void main(String[] args) throws IOException { // TODO Auto-generated method stub PutExample pe = new PutExample(); pe.putRows(); } public void putRows(){ List puts = new ArrayList(); for(int i=0;i<10;i++){ Put put = new Put(Bytes.toBytes("row_"+i)); Random random = new Random(); if(random.nextBoolean()){ put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("colfam1_qual1_value_"+i)); } if(random.nextBoolean()){ put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("colfam1_qual1_value_"+i)); } if(random.nextBoolean()){ put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual3"), Bytes.toBytes("colfam1_qual1_value_"+i)); } if(random.nextBoolean()){ put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual4"), Bytes.toBytes("colfam1_qual1_value_"+i)); } if(random.nextBoolean()){ put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual5"), Bytes.toBytes("colfam1_qual1_value_"+i)); } puts.add(put); } try{ table.put(puts); table.close(); }catch(Exception e){ e.printStackTrace(); return ; } System.out.println("done put rows"); } }
其中HTableUtil如下: package hbase.curd; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.util.Bytes; public class HTableUtil { private static HTable table; private static Configuration conf; static{ conf =HBaseConfiguration.create(); conf.set("mapred.job.tracker", "hbase:9001"); conf.set("fs.default.name", "hbase:9000"); conf.set("hbase.zookeeper.quorum", "hbase"); try { table = new HTable(conf,"testtable"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static Configuration getConf(){ return conf; } public static HTable getHTable(String tablename){ if(table==null){ try { table= new HTable(conf,tablename); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return table; } public static byte[] gB(String name){ return Bytes.toBytes(name); } }
这一种是没有使用MR的,下面介绍的几种方式都是使用MR的。
2.1 从HDFS文件导入HBase,继承自Mapper,代码如下: package hbase.mr; import java.io.IOException; import hbase.curd.HTableUtil; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class ImportFromFile { /** * 从文件导入到HBase * @param args */ public static final String NAME="ImportFromFile"; public enum Counters{LINES} static class ImportMapper extends Mapper{ private byte[] family =null; private byte[] qualifier = null; @Override protected void setup(Context cxt){ String column = cxt.getConfiguration().get("conf.column"); byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column)); family = colkey[0]; if(colkey.length>1){ qualifier = colkey[1]; } } @Override public void map(LongWritable offset,Text line,Context cxt){ try{ String lineString= line.toString(); byte[] rowkey= DigestUtils.md5(lineString); Put put = new Put(rowkey); put.add(family,qualifier,Bytes.toBytes(lineString)); cxt.write(new ImmutableBytesWritable(rowkey), put); cxt.getCounter(Counters.LINES).increment(1); }catch(Exception e){ e.printStackTrace(); } } } private static CommandLine parseArgs(String[] args){ Options options = new Options(); Option o = new Option("t" ,"table",true,"table to import into (must exist)"); o.setArgName("table-name"); o.setRequired(true); options.addOption(o); o= new Option("c","column",true,"column to store row data into"); o.setArgName("family:qualifier"); o.setRequired(true); options.addOption(o); o = new Option("i", "input", true, "the directory or file to read from"); o.setArgName("path-in-HDFS"); o.setRequired(true); options.addOption(o); options.addOption("d", "debug", false, "switch on DEBUG log level"); CommandLineParser parser = new PosixParser(); CommandLine cmd = null; try { cmd = parser.parse(options, args); } catch (Exception e) { System.err.println("ERROR: " + e.getMessage() + "\n"); HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(NAME + " ", options, true); System.exit(-1); } return cmd; } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = HTableUtil.getConf(); String[] otherArgs = new GenericOptionsParser(conf, initialArg()).getRemainingArgs(); CommandLine cmd = parseArgs(otherArgs); String table = cmd.getOptionValue("t"); String input = cmd.getOptionValue("i"); String column = cmd.getOptionValue("c"); conf.set("conf.column", column); Job job = new Job(conf, "Import from file " + input + " into table " + table); job.setJarByClass(ImportFromFile.class); job.setMapperClass(ImportMapper.class); job.setOutputFormatClass(TableOutputFormat.class); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Writable.class); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(input)); System.exit(job.waitForCompletion(true) ? 0 : 1); } private static String[] initialArg(){ String []args = new String[6]; args[0]="-c"; args[1]="fam:data"; args[2]="-i"; args[3]="/user/hadoop/input/picdata"; args[4]="-t"; args[5]="testtable"; return args; } }
2.2 读取HBase表写入HBase表中字段,代码如下: package hbase.mr; import hadoop.util.HadoopUtils; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ParseDriver { /** * 把hbase表中数据拷贝到其他表(或本表)相同字段 * @param args */ enum Counters{ VALID, ROWS, COLS, ERROR } private static Logger log = LoggerFactory.getLogger(ParseDriver.class); static class ParseMapper extends TableMapper{ private byte[] columnFamily =null ; private byte[] columnQualifier =null; @Override protected void setup(Context cxt){ columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily")); columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier")); } @Override public void map(ImmutableBytesWritable row,Result columns,Context cxt){ cxt.getCounter(Counters.ROWS).increment(1); String value =null; try{ Put put = new Put(row.get()); for(KeyValue kv : columns.list()){ cxt.getCounter(Counters.COLS).increment(1); value= Bytes.toStringBinary(kv.getValue()); if(equals(columnQualifier,kv.getQualifier())){ // 过滤column put.add(columnFamily,columnQualifier,kv.getValue()); cxt.write(row, put); cxt.getCounter(Counters.VALID).increment(1); } } }catch(Exception e){ log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+ ",Value:"+value); cxt.getCounter(Counters.ERROR).increment(1); } } private boolean equals(byte[] a,byte[] b){ String aStr= Bytes.toString(a); String bStr= Bytes.toString(b); if(aStr.equals(bStr)){ return true; } return false; } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { byte[] columnFamily = Bytes.toBytes("fam"); byte[] columnQualifier = Bytes.toBytes("data"); Scan scan = new Scan (); scan.addColumn(columnFamily, columnQualifier); HadoopUtils.initialConf("hbase"); Configuration conf = HadoopUtils.getConf(); conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily)); conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier)); String input ="testtable" ;// String output="testtable1"; // Job job = new Job(conf,"Parse data in "+input+",write to"+output); job.setJarByClass(ParseDriver.class); TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class, ImmutableBytesWritable.class, Put.class,job); TableMapReduceUtil.initTableReducerJob(output, IdentityTableReducer.class, job); System.exit(job.waitForCompletion(true)?0:1); } }
其中HadoopUtils代码如下: package hadoop.util; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.util.LineReader; public class HadoopUtils { private static Configuration conf; public static void initialConf(){ conf = new Configuration(); conf.set("mapred.job.tracker", "hbase:9001"); conf.set("fs.default.name", "hbase:9000"); conf.set("hbase.zookeeper.quorum", "hbase"); } public static void initialConf(String host){ conf = new Configuration(); conf.set("mapred.job.tracker", host+":9001"); conf.set("fs.default.name", host+":9000"); conf.set("hbase.zookeeper.quorum", host); } public static Configuration getConf(){ if(conf==null){ initialConf(); } return conf; } public static List readFromHDFS(String fileName) throws IOException { Configuration conf = getConf(); FileSystem fs = FileSystem.get(URI.create(fileName), conf); FSDataInputStream hdfsInStream = fs.open(new Path(fileName)); // 按行读取(新版本的方法) LineReader inLine = new LineReader(hdfsInStream, conf); Text txtLine = new Text(); int iResult = inLine.readLine(txtLine); //读取第一行 List list = new ArrayList(); while (iResult > 0 ) { list.add(txtLine.toString()); iResult = inLine.readLine(txtLine); } hdfsInStream.close(); fs.close(); return list; } }
2.3 MR和HTable结合,代码如下: package hbase.mr; import hadoop.util.HadoopUtils; import hbase.mr.AnalyzeDriver.Counters; import java.io.IOException; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ParseSinglePutDriver { /** * 使用HTable进行写入 * 把infoTable 表中的 qualifier字段复制到qualifier1字段 * 单个Put * @param args */ private static Logger log = LoggerFactory.getLogger(ParseMapper.class); static class ParseMapper extends TableMapper{ private HTable infoTable =null ; private byte[] columnFamily =null ; private byte[] columnQualifier =null; private byte[] columnQualifier1 =null; @Override protected void setup(Context cxt){ log.info("ParseSinglePutDriver setup,current time: "+new Date()); try { infoTable = new HTable(cxt.getConfiguration(), cxt.getConfiguration().get("conf.infotable")); infoTable.setAutoFlush(false); } catch (IOException e) { log.error("Initial infoTable error:\n"+e.getMessage()); } columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily")); columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier")); columnQualifier1 = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier1")); } @Override protected void cleanup(Context cxt){ try { infoTable.flushCommits(); log.info("ParseSinglePutDriver cleanup ,current time :"+new Date()); } catch (IOException e) { log.error("infoTable flush commits error:\n"+e.getMessage()); } } @Override public void map(ImmutableBytesWritable row,Result columns,Context cxt){ cxt.getCounter(Counters.ROWS).increment(1); String value =null ; try{ Put put = new Put(row.get()); for(KeyValue kv : columns.list()){ cxt.getCounter(Counters.COLS).increment(1); value= Bytes.toStringBinary(kv.getValue()); if(equals(columnQualifier,kv.getQualifier())){ // 过滤column put.add(columnFamily,columnQualifier1,kv.getValue()); infoTable.put(put); } } }catch(Exception e){ log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+ ",Value:"+value); cxt.getCounter(Counters.ERROR).increment(1); } } private boolean equals(byte[] a,byte[] b){ String aStr= Bytes.toString(a); String bStr= Bytes.toString(b); if(aStr.equals(bStr)){ return true; } return false; } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { String input ="testtable"; byte[] columnFamily = Bytes.toBytes("fam"); byte[] columnQualifier = Bytes.toBytes("data"); byte[] columnQualifier1 = Bytes.toBytes("data1"); Scan scan = new Scan (); scan.addColumn(columnFamily, columnQualifier); HadoopUtils.initialConf("hbase"); Configuration conf = HadoopUtils.getConf(); conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily)); conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier)); conf.set("conf.columnqualifier1", Bytes.toStringBinary(columnQualifier1)); conf.set("conf.infotable", input); Job job = new Job(conf,"Parse data in "+input+",into tables"); job.setJarByClass(ParseSinglePutDriver.class); TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class, ImmutableBytesWritable.class, Put.class,job); job.setOutputFormatClass(NullOutputFormat.class); job.setNumReduceTasks(0); System.exit(job.waitForCompletion(true)?0:1); } }
2.4 上面2.3中的HTable其实也是可以put一个List的,下面的方式就是put一个list的方式,这样效率会高。 package hbase.mr; import hadoop.util.HadoopUtils; import hbase.mr.AnalyzeDriver.Counters; import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ParseListPutDriver { /** * 使用HTable进行写入 * List 进行测试,查看效率 * 把infoTable 表中的 qualifier字段复制到qualifier1字段 * @param args */ private static Logger log = LoggerFactory.getLogger(ParseMapper.class); static class ParseMapper extends TableMapper{ private HTable infoTable =null ; private byte[] columnFamily =null ; private byte[] columnQualifier =null; private byte[] columnQualifier1 =null; private List list = new ArrayList(); @Override protected void setup(Context cxt){ log.info("ParseListPutDriver setup,current time: "+new Date()); try { infoTable = new HTable(cxt.getConfiguration(), cxt.getConfiguration().get("conf.infotable")); infoTable.setAutoFlush(false); } catch (IOException e) { log.error("Initial infoTable error:\n"+e.getMessage()); } columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily")); columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier")); columnQualifier1 = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier1")); } @Override protected void cleanup(Context cxt){ try { infoTable.put(list); infoTable.flushCommits(); log.info("ParseListPutDriver cleanup ,current time :"+new Date()); } catch (IOException e) { log.error("infoTable flush commits error:\n"+e.getMessage()); } } @Override public void map(ImmutableBytesWritable row,Result columns,Context cxt){ cxt.getCounter(Counters.ROWS).increment(1); String value =null ; try{ Put put = new Put(row.get()); for(KeyValue kv : columns.list()){ cxt.getCounter(Counters.COLS).increment(1); value= Bytes.toStringBinary(kv.getValue()); if(equals(columnQualifier,kv.getQualifier())){ // 过滤column put.add(columnFamily,columnQualifier1,kv.getValue()); list.add(put); } } }catch(Exception e){ log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+ ",Value:"+value); cxt.getCounter(Counters.ERROR).increment(1); } } private boolean equals(byte[] a,byte[] b){ String aStr= Bytes.toString(a); String bStr= Bytes.toString(b); if(aStr.equals(bStr)){ return true; } return false; } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { String input ="testtable"; byte[] columnFamily = Bytes.toBytes("fam"); byte[] columnQualifier = Bytes.toBytes("data"); byte[] columnQualifier1 = Bytes.toBytes("data2"); Scan scan = new Scan (); scan.addColumn(columnFamily, columnQualifier); HadoopUtils.initialConf("hbase"); Configuration conf = HadoopUtils.getConf(); conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily)); conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier)); conf.set("conf.columnqualifier1", Bytes.toStringBinary(columnQualifier1)); conf.set("conf.infotable", input); Job job = new Job(conf,"Parse data in "+input+",into tables"); job.setJarByClass(ParseListPutDriver.class); TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class, ImmutableBytesWritable.class, Put.class,job); job.setOutputFormatClass(NullOutputFormat.class); job.setNumReduceTasks(0); System.exit(job.waitForCompletion(true)?0:1); } }
数据记录条数为:26632,可以看到下面图片中的时间记录对比:

由于结合了hbase,所以需要在hadoop_home/lib目录下面加些额外的包,其整个包如下:
分享,成长,快乐
大数据
2018-07-24 17:52:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
Spark Catalyst 的实现分析
转载自: Spark Catalyst 的实现分析
Spark SQL 和 Catalyst 分别对应了 SQL 执行期以及解析期的优化工作,因此 Catalyst 的理解是 Spark SQL 的第一步。在一些 Catalyst 的介绍以及讲座中,下面一张图是必出现,它描述了从 SQL 语句到最后执行 Plan 的生成过程中,除了 Spark SQL,其他 SQL 引擎的工作原理也基本一致,比如 Hive 之类的。
本文核心也是介绍 Catalyst 内部的实现,但是不是按照这张图的步骤来介绍 Catalyst 的实现原理,而是按照 SQL 给人最直接几个概念,比如 Row,Expression,Plan 来逐步介绍它们的内部实现。
看过 Spark SQL 或者 Catalyst 人都知道,相比 Spark Core 的代码,这一块真的挺复杂了,各种算法逻辑,复杂的 Scala 语法元素,以及各种性能优化,代码自动生成,可能得需要几倍的时间,反复的琢磨,才能梳理清楚。
1. Row
Spark SQL 中处理的数据与传统 RDD 最大区别在处理的每行数据的类型表示,传统 RDD 不对每行数据进行类型要求,可以任何复杂数据结构,比如 Map, 或者自己自定义的类之类的,而 Spark SQL 中为 Row 。
Row 的概念是针对之前版本中的 DataFrame 而言的,在 1.6 版本中提出的 Dataset 其实也是有 Row 的概念,只是会被隐式转换掉而已,在 Catalyst 中,处理的对象为 InternalRow ,注意 InternalRow 和 Row 是有本质区别的, Row 是 API 层面的概念,专门针对 DataFrame,而 InternalRow 为内部概念。
Row 类为 DataFrame 中核心概念,它表示一行结构化数据的输入, Row 本身可以有 Scheme,即支持设置每个字段的类型,支持 GetInt 之类的操作;它也可以无 Scheme,即 Scheme=NULL ,支持泛化的 Get 操作,返回 Any 类型数据。 Row 本身就有点像 Array,Tuple ,它的子类 GenericRow 和 GenericRowWithSchema 都是基于 Array[Any] 来实现,它的每个字段的类型为 Any ,因此 Row 可以理解为是无类型约束的,虽然它内部有 Scheme ,但是 Array[Any] 到 Row 的转换,以及泛化的 Get 操作都是不受 Scheme 的限制,因此它是类型不安全的。
在 Spark 1.6 版本以后,提出了 Dataset 概念,它也是 Spark 从 RDD,SchemeRDD,DataFrame 演化过来最为标准化一个概念,相信在未来会是 Spark 中最为核心概念, In the long run, we expect Datasets to become a powerful way to write more efficient Spark applications.
Dataset 是 Spark SQL 中概念,在 API 层面上来说,它没有像 DataFrame 一样,强制要求输入数据为 Row ,或者依赖 Row.fromSeq 等功能函数将输入其他类型的数据强制转换为 Row,但是 Dataset 实际也是要求它处理的数据是 Row 相似的类型,即为 Catalyst 内部的** InternalRow 和 UnsafeRow **的概念。当我们执行 seq(1,2,3).toDs 或者 Rdd.toDs 时候,实际上对每一行记录在内部做了很多转换。DataFrame 也被转换为 Dataset 进行包装,即将 DataFame 表示为 Dataset[Row] 。
Dataset 核心概念是为 Encoder ,它就是我们上面说隐式转换的幕后工具,如下所示 Encoder 内部的 toRow 函数,支持将输入 T 类型转换为 InternalRow : def toRow(t: T): InternalRow = try { inputRow(0) = t extractProjection(inputRow) }
它支持将一个外部的数据类型,转换为 Dataset 内部的 InternalRow ,比如上面的 seq(1,2,3) 就是需要将每个输入的 Int 数字转换为一个 InternalRow , DataFrame 同理需要将 Row 转换为 InternalRow 。在转换的过程中,是 有类型的检查 ,这也是与 DataFrame 最大的区别。
另外 Dataset 对外最为自豪一点就是序列化以及反序列化的性能,用官方话说: Datasets also leverage Tungsten’s fast in-memory encoding,an order of magnitude faster than the default Java serializer ,那么它是这么实现呢?其实他就是上面谈到的 Encoder 的功能,上面虽然我们说它返回的是一个 InternalRow ,实际上, 它返回的是 InternalRow 一个子类,即 UnsafeRow
UnsafeRow 内部是基于 Java 的 unsafe 包来实现(Tungsten 的功能),对 Row 中每个字段的操作都转换为字节的操作,换句话说它底层实际存储结构是 byte[],而且支持 Kryo 序列化,相比使用 Java 序列化工具来序列化数组/Row 之类的复杂数据结构,它的性能肯定要好很多!
另外 InternalRow 还有一个子类,即 MutableRow ,而且 UnsafeRow 也是 MutableRow 的子类,它即为可修改的 InternalRow ,在很多地方都会出现这个,原理很简单,支持 set 等操作而已。
2. Expression
在 SQL 语句中,除了 SELECT FROM 等关键字以外,其他大部分元素都可以理解为 Expression,比如 SELECT sum(a), a ,其中 sum(a) 和 a 都为 Expression;
从 SQL 语句功能来说,Expression 的功能可以划分为 Projection,Predicate,Ordering,Aggregate;
其中 Projection 功能就是 input 数据进行加工,输出为 InternalRow;Predicate 而是对输入的 InternalRow 进行加工输出为 Bool,Ordering 和 Aggregate 则是针对 Sortby/Groupby 专用的两类表达式;你可能会说,那么 substr 之类的函数表达式不在上面归类?substr 在 sql 语句中肯定不会单独存在,他们都是作为子表达式出现在上面的几类当中,比如用于判断或者输出数据的加工。 Expression 是一个 Tree 结构,即可以通过多级的 Child Expression 来组合成复杂的 Expression,比如前面 sum(a) 就是由 sum 和 a 两个简单的 Expression 组合而成,比如更复杂的 Expression 有 max(sum(a), sum(b)) ; Expression 基本的功能是求值,比如 abs(a) , IfNull(A,B) 之类的操作,他们都是对输入 Row 进行加工,并输出处理结果,即 Expression 需要实现 def eval(input: InternalRow = null): Any 函数来实现它的功能。 既然 Expression 的功能是求值,那么它就有输入和输出类型的限制。每个 Expression 都有 def dataType: DataType 类型变量来表示它的输出类型,以及 def checkInputDataTypes(): TypeCheckResult 函数来校验当前 Expression 的输入(为 Tree 结构,那么它的输入即为 Child Expression 输出)是否符合类型要求。 Expression 功能是针对 Row 进行加工,但是可以把加工方法分为以下几种 原生的 def eval(input: InternalRow = null): Any 函数; 对于 Non Child Expression,Expression 的计算是基于 Child Expression 计算结果进行二次加工的,因此对于 Non Child Expression,对 Eval 进行默认实现,子类只需要实现函数 def nullSafeEval(input: Any): Any 即可以,如下所示为只有一个 Child 的 Expression 的实现: override def eval(input: InternalRow): Any = { val value = child.eval(input) if (value == null) { null } else { nullSafeEval(value) } } //比如 ExpressionAbs,就是利用子 Expression 的结果结果之上,进行一个 math.abs 加工。 Projection 类型,它本身不是 Expression,但是它可以根据 N 个 Expression,对输入 row 的 N 个字段分别进行加工,输出一个新的 Row,即 Expression 的容器。 abstract class Projection extends (InternalRow => InternalRow) def apply(input: InternalRow): InternalRow = { val outputArray = new Array(exprArray.length) var i = 0 while (i < exprArray.length) { outputArray(i) = exprArray(i).eval(input) i += 1 } new GenericInternalRow(outputArray) } //比如 row 序列化操作,可以把一个 row 序列化为 unsaferow,相当于一个 Projection Expression 也可能是不支持 eval 的,即 Unevaluable 类型的 Expression,一般有三种情况:1) 是真的无法求值,比如处于 Unresolved 状态的 Expression;2) 是不支持通过 eval 进行求值,而需要通过 gen code 的方式来实现 Expression 功能; 3) Expression 为 RuntimeReplaceable 类型,它仅仅是在 parser 阶段一种临时 Expression,在优化阶段,会被替换为别的 Expression,因此它本身不需要有执行逻辑,但是得有替换相关的逻辑。
Expression 大体归类
Name 归类 功能描述
数据输入: Expression 为 Tree 结构,中间节点都为加工类型表单,而叶子节点即为数据产生节点
Attribute Catalyst 里面最为重要的概念,可以理解为表的属性,在 sql 处理各个阶段会有不同的形态,比如 UnresolvedAttribute->AttributeReference->BoundReference,后面会具体分析
Literal 常量,支持各种类型的常量输入
datetimeExpressions 返回当前时间类型的常量, CurrentDate , CurrentTimestamp
randomExpressions 支持生成一些随机数
其他一些输入 比如获取 sql 计算过程中的任务对应的 InputFileName,SparkPartitionID
基本计算功能:
arithmetic nullSafeEval 数学 Expression,支持 - , + , abs , + , - , * , / , % , max , min , pmod 数学运算符
bitwiseExpressions nullSafeEval 位运算数,支持 IntegralType 类型的 and , or , not , xor 位运算
mathExpressions nullSafeEval 数学函数,支持 cos , Sqrt 之类 30 多种,相当于 Math 包
stringExpressions nullSafeEval 字符串函数,支持 Substring , Length 之类 30 多种,相当于 String 包
decimalExpressions nullSafeEval Decimal 类型的支持,支持 Unscaled , MakeDecimal 操作
datetimeExpressions nullSafeEval 时间类型的运算
collectionOperations nullSafeEval 容器的操作,支持容器 Contains , Sort , Size 三种操作
cast nullSafeEval 支持数据类型的转换
misc nullSafeEval 功能函数包,支持 MD5,crc32 之类的函数功能
基本逻辑计算功能:
predicates eval/nullSafeEval 类型 支持子 Expression 之间的逻辑运算,比如 AND , In , Or ,输出 Bool 类型
regexpExpressions nullSafeEval 支持 LIKE 相关操作
conditionalExpressions eval 支持 case,if,great,least 四种逻辑判断运算
nullExpressions eval/RuntimeReplaceable 与 NULL/NA 相关的判断或者 IF 判断功能,大部分都为 RuntimeReplaceable,会被进行优化处理
其他类型:
complexTypeCreator
Generator
eval
eval
SparkSql 是支持复杂数据结构,比如 Array,Map,Struct,这类 Expression 支持在 sql 语句上生成它们,比如 select array
支持 flatmap 类似的操作,即将 Row 转变为多个 Row,支持 Explode 和自定义 UserDefinedGenerator 两种,其中 Explode 支持将数组和 map 拆开为多个 Row。
2.1 Attribute 详解
Attribute 直译为属性,在 SQL 中,可以简单理解为输入的 Table 中的字段,Attribute 通过 Name 字段来进行命名。SQL 语句通过 Parse 生成 AST 以后,SQL 语句中的每个字段都会解析为 UnresolvedAttribute,它是属于 Attribute 的一个子类,比如 SELECT a 中的 a 就表示为 UnresolvedAttribute("a") ,还有一个特殊的 UnresolvedAttribute,既为 SQL 语句中的 * ,它表示为 Star ,属于 UnresolvedAttribute 类型的子类。
Analyser 需要对 AST 进行进一步的加工,其中有一个很重要的操作就是把整个 AST 中所有 Unresolved 的 Attribute 都转变为 resolved 状态,比如根据输入 Table 将 Star 进行 expand 操作,对应的 Rule 名称为 ResolveReferences ,具体实现细节这里就不展开。
对于 resolve 操作除了将 Star 进行展开以外,它的主要功能就是关联 SQL 语句所有位置用到的 Attribute,即在 Attribute 的 name 基础上,指定一个 ID 进行唯一标示,如果一个 Attribute 在两处被多处被引用,ID 即为同一个 (怎么实现的?Attribute Resolve 操作时从底到顶来遍历整改 AST,每一步都是根据底部已经 resloved 的 Attribute 来给顶部的 Attribute 赋值,从而保证如果两个 Attribute 是指向同一个,它们的 ID 肯定是一样的;对于处于叶子节点 Attribute 是优先进行 resolve 的,比如 Rule:ResolveRelations 对处于底部的 Relation 进行 ResolveRelatition)。可以这么理解,做这些事情都是为了优化,物理存储的 Table 可能有很多 Attribute,而通过 resolve 操作,就指定整个计算过程中需要使用到 Attribute,即可以只从物理存储中读取相应字段,上层各种 Expression 对这些字段都转变为引用,因此 resolve 以后的 Attribute 不是叫做 resolvedAttribute ,而是叫做 AttributeReference 。
对于一个中间节点的 Expression,如果它对一个 Attribute 有引用,比如求一个字段值的长度 length(a) ,这里 a 经过了 UnresolvedAttribute 到 AttributeReference 的转化,但是针对一个输入的 Row,进行 length Expression 计算时,还是无法从 AttributeReference 中读取相应在 Row 中的值,为什么?虽然 AttributeReference 也是 Expression,但是它是 Unevaluable,为了获取属性在输入 Row 中对应的值,需要对 AttributeReference 再进行一次 BindReferences 的转化,生成 BoundReference ,这个操作本质就是将 Expression 和一个输入 Scheme 进行关联,Scheme 有一组 AttributeReference ,它们之间是有顺序的,通过获取 AttributeReference 在 AttributeReference 组中的 Index,并生成 BoundReference,在对 BoundReference 进行 eval 时候,即可以使用该 index 获取它在相应 Row 中的值。
下面是 SQL 语句中 Attribute 经过的多次转化的过程:
SQL---parser---->UnresolvedAttribute----Analyser--->AttributeReference(resolved) ----Bind---->BoundReference
2.2 Expression Codegen
Spark Sql 是支持 Expression 层面代码生成,首先第一个疑问是我们知道 Expression 有 eval 方法来暴露 Expression 的功能,为什么还要搞一个 Codegen 呢?原因大体上有两个,一是提高性能,二是支持一些不能通过 eval 来表达的功能。这里主要解释了第一个,即提高性能,ps:我还没有去详细的测试这一块对性能影响大小,只是从感官上做一个结论:
基于 eval,Expression 执行过程中应该是这样的 e1.eval(e2.eval(e3.eval(e4.eval(...)))),随着 Expression 的复杂度上升,这个执行深度应该会很大,而且每一个操作之间是基于参数和返回进行传递,在操作系统系统层面是存在开销的;其次如果一个 Expression 计算结果会被多次利用,它的结果是没有办法被二次利用。
那么代码生成是这么解决这个问题的呢?
对于一个完整的 SQL 的执行,其中所有 Expression 只是对应一个最终执行代码的一个片段,注意是代码片段,而不是函数,更加不是一个类,每个代码片段由 ExprCode 来表示
case class ExprCode(var code: String, var isNull: String, var value: String)
code 为代码片段的计算代码,和 eval 函数功能一样,这个片段核心功能是对一个 row 进行处理,并把处理结果的值写到一个变量中,这个变量名称即为 value 表示,isNull 为 false 或者 true 字符串来表示这个代码片段生成的 value 对应的值是否为 Null。 如果由 3 个 Expression,分别为 exp1:ExprCode(code1,isnull1,value1) exp2:ExprCode(code2,isNull2,value2) exp2:ExprCode(code3,isNull3,value3) 在 SQL 执行过程中,针对一个 Row 会生成下面的组合代码 funtion(row) { //定义三个变量 exp1.dataType value1 = defauleValue(exp1.dataType) exp2.dataType value2 = defauleValue(exp1.dataType) exp3.dataType value3 = defauleValue(exp1.dataType) // exp1.code1 exp2.code2//可以使用 value1 变量的值 exp3.code3//可以使用 value1 和 value2 的值 .... }
Expression 层面的 Codegen 的实现其实很简单,这里就是不详细去描述,后面会重新针对 codegen,包括 Whole Stage Codegen 一起做一次大大专题进行分析。
3. LogicalPlan
如上所言,在 SQL 语句中,除了 SELECT FROM 等关键字以外,其他大部分元素都可以理解为 Expression,那么用什么来表示剩下的 SELECT FROM 这些关键字呢?毕竟 Expression 只是一些 Eval 功能函数或者代码片段,需要一个东西来串联这些片段,这个东西就是 Plan ,注意,我这里说的是 Plan ,是一个统称,而不是仅指向 LogicalPlan 。如开言图所示,在 SQL 解析以及优化,到最后过程中,都是以 Plan 而存在,只是每一步 Plan 功能不同而已。
Plan 表现形式也是 Tree,节点之间的关系可以理解为一种操作次序,比如 Plan 叶子节点表示从磁盘读取 DB 文件,而 Root 节点表示最终数据的输出;下面是 Plan 最常见的实例截图。
用 SQL 语句来表示这个 Plan 即为: SELECT project FROM table, table WHERE filter 。
Expression 功能是对输入 Row 进行加工,输出可能是 Any 数据类型。而 Plan 输出类型为 def output: Seq[Attribute] 表示的一组 Attribute,比如上面的 Project 和 Table 肯定是输出一个由 Seq[Attribute] 类型表示的 Row, Filter 感觉是输出 Ture/False,但是这里说的 Plan,而不是 Filter 类型的 Expreesion,Filter 类型的 Plan 会在内部根据 Expression 计算结果来判断是否返回 Row,但是 Row 返回的类型肯定也是由 Seq[Attribute] 表示的。
另外 Expression 与 Plan 关系是被包含,比如 Filter 肯定是基于一个 Expression 计算结果进行判断, Project 内部元素要么是直接为 Star ,或者为 Attribute ,或者为复杂的 Expression ,比如 SUM 。
下面我开始分析 LogicalPlan ,它是 SQL 语句经过 Parse 以后第一步展现形式。基于 ANTLR 实现的 SQL AST 分析过程即为 AST 树遍历过程,Catalyst 也是对 AST 树遍历过程中,完成 LogicalPlan 和所有依赖的 Expression 的构建,相关逻辑在 org.apache.spark.sql.catalyst.parser.AstBuilder 以及相关子类中,如果对 ANTLR 有一点基础,这一块代码还是比较容易看懂,就不细化分析,我们着重放在 LogicalPlan 上面。
在上面示意图中,我们看到 LogicalPlan 是由一些节点组成,在 Spark SQL 中,节点大体分为两种类型:Operator 和 Command。其中我们上面看到的 Filter 都可以理解为 Operator,而我们在 SQL Cli 中执行 set a=b 以及 addjar a ,它们都是 Command 类型的 Plan,当然相比由很多 Operator 组成的多级复杂 Plan,Command 组成的 Plan 可能是单节点而存在,要简单一些,下面我们对 Operator 做一些归类。
Name 功能描述
Project (projectList: Seq[NamedExpression], child: LogicalPlan) SELECT 语句输出操作,其中 projectList 为输出对象,每一个都为一个 Expression,它们可能是 Star,或者很复杂的 Expression
Filter (condition: Expression, child: LogicalPlan) 根据 condition 来对 Child 输入的 Rows 进行过滤
Join (left: LogicalPlan,right: LogicalPlan,joinType: JoinType,condition: Option[Expression]) left 和 right 的输出结果进行 join 操作
Intersect (left: LogicalPlan, right: LogicalPlan) left 和 right 两个 Plan 输出的 rows 进行取交集运算。
Except (left: LogicalPlan, right: LogicalPlan) 在 left 计算结果中剔除掉 right 中的计算结果
Union (children: Seq[LogicalPlan]) 将一组 Childs 的计算结果进行 Union 联合
Sort (order: Seq[SortOrder],global: Boolean, child: LogicalPlan) 对 child 的输出进行 sort 排序
Repartition (numPartitions: Int, shuffle: Boolean, child: LogicalPlan) 对 child 输出的数据进行重新分区操作
InsertIntoTable (table: LogicalPlan,child: LogicalPlan,...) 将 child 输出的 rows 输出到 table 中
Distinct (child: LogicalPlan) 对 child 输出的 rows 取重操作
GlobalLimit (limitExpr: Expression, child: LogicalPlan) 对 Child 输出的数据进行 Limit 限制
Sample (child: LogicalPlan,....)
Aggregate (groupingExpressions: Seq[Expression],aggregateExpressions: Seq[NamedExpression],child: LogicalPlan)
根据一些参数,从 child 输出的 Rows 进行一定比例的取样
对 child 输出 row 进行 aggregate 操作,比如 groupby 之类的操作
这些 Operator 共同组成 SELECT SQL 语句中各种核心语言要素,而且 Catatyst 后面的所有优化逻辑都是针对 SELECT 语句进行优化。对于譬如 CREATE TABLE 以及 SET 之类的 SQL 语言元素,它们都是 Command 存在,相比 SELECT,Command 组成的 Plan 要简单很多,不过它的种类倒是真的不少!
Name 功能描述
DataBase 操作类 支持 ShowDatabase 以及 UseDatabase 以及 Create 等操作
Table 操作类 多达 13 种,比如 Create,Show,Alter 等
View 操作类 CreateViewCommand 支持 View 的创建
Partition 操作类 支持 Partition 新增删除等操作
Resources 操作类 比如 AddJar 之类的资源操作
Functions 操作类 支持新增函数,删除函数等操作
Cache 操作类
Set 操作
支持对 Table 进行 cache 和 uncache 操作
通过 SetCommand 执行对参数进行临时修改
由 Operator 组成的 Plan,仅仅只是一组描述形式的而存在,毕竟只是 LogicalPlan ,它们需要被转换为最终的 PhysicalPlan 才能真正具有可执行的能力,而这些 Command 类型的 Plan 都是以 def run(sparkSession: SparkSession): Seq[Row] 函数暴露给 Spark SQL,比如通过调用 Table 的 run 函数完成 Table 的创建等操作。因此我们可以肯定的是:Plan 优化都是针对以 Operator 组成的 Plan。
4. Expression 和 Plan 的 Tree 支持的操作
在 Catalyst 中,Expression 和 Plan 都为 Tree 类型的数据结构,无论是从 SQL 语句中解析出 Plan 或者 Expression,或针对 Plan 或 Expression 进行 Analy 以及 Optimize 操作,都需要针对 Tree 数据结构进行遍历,其中经典 Tree 遍历算法有先序和后序遍历。
另外由于 TreeNode 节点本身类型为 Product (何为 Product ?在 Scala 中 Product 是最基本数据类型之一,如果一个 Case Class 继承 Product ,那么即可以通过 productElement 函数或者 productIterator 迭代器对 Case Class 的参数信息进行索引和遍历),并且所有 Expression 和 Plan 都是属于 Product 类型,因此可以通过 TreeNode 内部定义的 mapProductIterator 函数对节点参数进行遍历。在遍历过程中,可以针对参数类型进行 Case 过滤,从而有选择的处理本次希望处理的数据,比如对一个 Expression 中所有类型为 Expression 的子表达式进行操作,而可以忽略其他类型的参数。
对 Plan 或 Expression 进行遍历的目的:首先是为了收集一些信息,比如针对 Tree 进行 map/foreach 操作;其次是为了对 Tree 节点内部的信息进行修改,比如对 PlanTree 中每个 Plan 节点内部引用的 Attribute 进行 Revole 操作;最后就是为对 Tree 的数据结构进行修改,比如删除 Tree 的子节点,以及与子节点进行合并,比如 Catasylt Optitimze 就有大量 Tree 结构的修改。
Catalyst 在实现 Tree 的操作上,代码很是优雅的主要原因:它是基于 Scala 来实现。Scala 的偏函数 PartialFunction(偏函数是对函数定义域的一个子集进行定义的函数。 scala 中用 scala.PartialFunction[-T, +S] 类来表示)可以清晰的描述操作目的,比如 PartialFunction[LogicalPlan, LogicalPlan] 是针对 Plan 类型的节点进行操作,而 PartialFunction[Expression, Expression] 是针对 Expression 进行操作;其次 Scala 强大的 Case 正则匹配,让在对 Tree 进行遍历过程,可以清晰确定这一次需要操作的对象,如果用别的语言来实现下面 TypeFilter 合并,其代码将会是何等的苦涩。 case t1 @ TypedFilter(_, _, t2 @ TypedFilter(_, _, child)) if t1.deserializer.dataType == t2.deserializer.dataType => TypedFilter(combineFilterFunction(t2.func, t1.func), t1.deserializer, child) //优雅 Case 结构匹配语句,以及简洁的 CaseIF 判断,让快速定位相应的逻辑节点,并对节点数据进行修改变的如此简单。
同时无论是对 Expression 进行操作还是对 Plan 进行操作,Catalyst 将他们都抽象为 Rule ,它的 apply 函数通过传入一个 TreeType 类型的元素,并输出一个 TreeType 类型的元素。 abstract class Rule[TreeType <: TreeNode[_]] extends Logging { val ruleName: String def apply(plan: TreeType): TreeType }
在对一个元素进行 rule.apply 操作以后,可以针对前后的元素是否相等 curPlan.fastEquals(lastPlan) ,来确定该 rule 是否对该元素有效,其中无效可能是因为该 rule 没有 case 匹配到相应信息,首先可能是真的没有,其次 rule 是可以反复的应用到一个节点,直到节点状态趋向稳定,即 rule 已经应用多次,已经找不到匹配的信息了。
另外可以将一组 Rule 组合为一个 Batch(name: String,rules: Rule[TreeType]*) 并把它封装在 RuleExecutor 中,从而通过 RuleExecutor 将该组 Rule 的可执行接口提供给外部使用,比如大名顶顶的 Optimize 策略,就是一堆堆的 Batch 组成。 abstract class Optimizerextends RuleExecutor[LogicalPlan] { def batches: Seq[Batch] = { Batch("Finish Analysis", Once, EliminateSubqueryAliases, ReplaceExpressions, ComputeCurrentTime, GetCurrentDatabase(sessionCatalog), RewriteDistinctAggregates) :: ....
如上所言, Rule 是通过反复的应用在一个节点,直到节点状态趋向稳定。但是如优化类型的 Rule ,很多时候,优化是没有止境了,优化的越深,优化开销就越大。因此我也需要一定的手段来控制 Batch 应用到何种程度,比如 Once extends Strategy 表示该 Batch 只允许应用一次;而 FixedPoint extends Strategy 表示该 Batch 最多只允许应用 N 次,当然如果 batch 在运行过程中,节点已经 稳定 ,会立即退出尝试的。
Spark SQL 对 Plan Tree 或者内部 Expression Tree 的遍历分为几个阶段:对 AST 进行 Parse 操作,生成 Unresolve Plan;对 Unresolve Plan 进行 Analy(包括 Resolve) 操作,生成 Logical Plan;对 Logical Plan 进行 Optimize 操作,生成 Optimized Logical Plan;以及最后进行 Planning 操作,生成 Physical Plan。这里面的每一阶段都可以简述为应用一组 BatchRule 来对 plan 进行加工,但是里面每一个 Rule 都是很值得去细节学习和分析的,实话,我也没有一个一个去看!!! 本文主要是针对 catalyst 内部实现做了一些简洁的分析,注重分析与 catalyst 相关的三个概念 Row,Expression,Plan ,因为对三个概念的理解与否,将决定是否可以看懂 spark sql 后面相关细节。 同时,Spark SQL 真的很复杂,如果想真的完全理解 Spark SQL 内部的具体细节,这条路还是很长!fighting!
大数据
2018-07-24 16:25:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
经过这么几天的折腾,发现 Sqoop1 真的比 Sqoop2 方便好用的多,Sqoop2 坑真是太多了,搞不定。Sqoop1 坑少也稳定,但是零基础使用过程中也是有几点需要注意的。
官方下载: Sqoop 官网
官方使用文档
Sqoop-1.4.6安装部署及详细使用介绍
如果像我一样直接用CDH里边自带的话,方便的地方是环境变量什么的不需要我再去配置了,很方便。
要检查安装成功没,直接 sqoop version ;
命令行使用
1 :
可能会出现警告:
accumulo does not exist! Accumulo imports will fail.
hcatalog does not exist! hcatalog imports will fail.
方法一:
CDH官方方法 ,如下:
Services that you want to use with Sqoop, such as HBase, Hive HCatalog, and Accumulo. When you run Sqoop, it checks to see if these services are installed and configured. It logs warnings for services it does not find. These warnings, shown below, are harmless. You can suppress these error messages by setting the variables $HBASE_HOME, $HCAT_HOME and $ACCUMULO_HOME to any existing directory .
即:只需要把 $HBASE_HOME, $HCAT_HOME and $ACCUMULO_HOME 设置为任意一个存在的目录即可。比如我: export ACCUMULO_HOME=/root , 果然就不报警告了,非常简单方便,强烈推荐。而且使用方法二的话直接修改了配置文件很有可能会出问题,不安全
方法二:
如果不需要,可以屏蔽它;
到 “SQOOP_HOME/bin” 下编辑 configure-sqoop,注释掉里边与Accumulo 和 hcatalog 有关的代码块,不是太难,自己打开看看这个文件就知道了。
2:要注意 “–target-dir” 需要指定一个空的不存在的文件夹,不能指定已经存在的文件夹,它会自己创建的
3:一个 从 Oracle 到 HDFS 的 import 语句: sqoop import --connect jdbc:oracle:thin:@:1521:app --username user --P --query "select * from HD.STORE where \$CONDITIONS and RCVTIME < TO_TIMESTAMP('2017-05-30 00:00:00','yyyy-mm-dd hh24:mi:ss.ff')" --split-by id --direct --target-dir /user/root/store --m 1 1
这里边几点需要注意的坑: “–connect “: CDH Sqoop1 使用 里边用一些 JDBC Connection Strings 的语法介绍和例子,但是 Oracle 的我试了不行,正确的写法应该如: jdbc:oracle:thin:@:: ,区别在于 thin 后边是需要又一个 冒号 的,很容易错,还有 与 中间的连接也是冒号,因为有的写法是 “/“,但是这里需要注意的是,这里是 冒号。 “–query“: 如果这里SQL语句里边用过单引号了,那外边必须要用双引号 这里的SQL假如直接使用 “FROM STORE“的话,是会报 table 找不见,或者不存在的,因为使用“–query“的话没有指定 Schema ,所以这里必须使用 . 的形式

where 后边必须有 $CONDITIONS 条件,sqoop 运行的时候,看日志发现sqoop 会在这里插入(1=0)或(1=1)来控制这条语句的执行。外边使用双引号的话,$CONDITIONS 前边需要加反斜杠 即:\$CONDITIONS。
Free form query in Sqoop Import with WHERE clause “–split-by” :需要指定一个 int 类型的列名,一般是主键。sqoop 会计算这个字段的 MIN 和 MAX ,然后结合 fetchSize 来确定 怎么切分数据块。这个字段必填。 “–direct“:没加这个之前,导入特别慢,中间经常会出现 “Connect reset“,这个没关系,一会儿它又回自动连接上。但是确实太慢了。使用这个字段后,导入速度又快又稳定,这个字段代表的意思应该是 使用的是关系数据库自带的导入导出工具。最好加上这个字段配置。
–options-file 使用
为了方便复用,我们可以把一串命令和参数写入txt文件中,使用 –options-file 来调用。详细使用方法, 官方文档的 6.4 Using Options Files to Pass Arguments 里写的特别清楚了。因为是英文的,我这里简单给英文不好的同学说一下要点和更命令行方式对比需要注意的:
先上一个等同上面命令行例子的例子。
store_import.txt # # import options file of STORE table # # # oracle table STORE to hive table * # import --connect jdbc:oracle:thin:@:1521:app --username user --password 123456 --query select * from HD.STORE where $CONDITIONS and \ RCVTIME < TO_TIMESTAMP('2017-05-30 00:00:00','yyyy-mm-dd hh24:mi:ss.ff') --split-by FLOWNO --direct --target-dir /user/root/store --m 1
用时, $ sqoop --options-file /users/home/store_import.txt 1 文件要是 txt 文件形式 里边的参数和值的顺序跟命令行里的一行,但是都必须如上一样换行,一行一个 这里密码我们用了 --password 直接写入密码,方便自动执行;命令行里我们用的是 -P ,用 -P 为了安全,在命令行执行时需要手动输入密码,而且不会明文显示 变化较大的是 --query 参数后边的SQL语句,需要注意: 参数和值一般一行一个,但是如果就像SQL语句一样,长的话可以在结尾使用反斜杠来换行 一般不要用双引号,除非双引号里边包含单引号,使用双引号的话就不能再使用换行符了,双引号必须在同一行。 这里的 $CONDITIONS 前边不再需要反斜杠来转换 $ 符号了
上面的例子都是从 oracle 导入 hdfs 的,现在我们用 sqoop 来直接导入 Hive。
导入数据到Hive
官方文档 7.2.12. Importing Data Into Hive
然后再上我的例子, import --hive-import --hive-table dw_hd.ods_store --connect jdbc:oracle:thin:@:1521:app --username user --password 123456 --query select * from HD.STORE where $CONDITIONS and \ RCVTIME < TO_TIMESTAMP('2017-05-30 00:00:00','yyyy-mm-dd hh24:mi:ss.ff') --split-by FLOWNO --direct --target-dir /user/root/store --null-string '\\N' --null-non-string '\\N' --m 2
sqoop 导入 Hive 分三步:
1. 先导入 --target-dir 指定的 HDFS 的目录中
2. 在 Hive 中建表
3. 调用 Hive 的 LOAD DATA INPATH 把 --target-dir 中的数据移动到 Hive 中
观察上边例子与前边的做对比,有以下几点不同: --hive-import :指定是导入 Hive --hive-table :导入 Hive 中的数据库名和表名 --null-string 和 --null-non-string :分别代表了 sqoop 对 字符串类型 null 值 和 非字符串类型 null 值 的处理。如果不指定的话,默认导入 Hive 后 字符串类型的 null 值是 ‘null’,非字符串类型 null 值是 ‘NULL’,这里用把这两种情况统一成了 ‘NULL’,sqoop 中用 ‘\N’,如果想要小写的 ‘null’ 的话,使用 ‘\N’。
问题1:导入后从Hive中查到的数据条数比实际从关系数据库中查到的条数多?
解决:原因是使用 --hive-import 会使用默认的 Hive 的分隔符,值分隔符 ^A 和行分隔符 \n 。 官方文档 7.2.12. Importing Data Into Hive 里原文如下:
Hive will have problems using Sqoop-imported data if your database’s rows contain string fields that have Hive’s default row delimiters ( \n and \r characters) or column delimiters ( \01 characters) present in them. You can use the --hive-drop-import-delims option to drop those characters on import to give Hive-compatible text data. Alternatively, you can use the --hive-delims-replacement option to replace those characters with a user-defined string on import to give Hive-compatible text data. These options should only be used if you use Hive’s default delimiters and should not be used if different delimiters are specified.
Sqoop will pass the field and record delimiters through to Hive. If you do not set any delimiters and do use --hive-import , the field delimiter will be set to ^A and the record delimiter will be set to \n to be consistent with Hive’s defaults.
这样问题就来了,如果导入的数据中有’\n’,hive会认为一行已经结束,后面的数据被分割成下一行。这种情况下,导入之后hive中数据的行数就比原先数据库中的多,而且会出现数据不一致的情况。
Sqoop也指定了参数 --fields-terminated-by 和 --lines-terminated-by 来自定义行分隔符和列分隔符。
可是当你真的这么做时 坑爹呀!
INFO hive.HiveImport: FAILED: SemanticException 1:381 LINES TERMINATED BY only supports newline ’\n’ right now. 也就是说虽然你通过 --lines-terminated-by 指定了其他的字符作为行分隔符,但是hive只支持 \n 作为行分隔符。 ORACLE中查询某个字段包含 回车 换行 符, || 不是或,是 oracle 中的字符串连接符。 % 是通配符,代表任意字符串。
查看是否包含 回车换行 符,即: \r\n
select * from system.test_tab1 where name like '%'||chr(13)||chr(10)||'%'
单独查看是否包含 回车换行 符,即: \r
select * from system.test_tab1 where name like '%'||chr(13)||'%'
单独查看是否包含 换行 符,即: \n
select * from system.test_tab1 where name like '%'||chr(10)||'%'
解决方法:简单的解决办法就是加上参数 --hive-drop-import-delims 来把导入数据中包含的hive默认的分隔符去掉。这个最简单,如果确定数据中不该含有这些字符的话,或者确定去掉没影响的话,可以用这个。另外,使用这个就没法使用 --direct 选项了。
增量导入
7.2.9. Incremental Imports
问题:–incremental lastmodified 模式 不支持 导入Hive
--incremental lastmodified option for hive imports is not supported. Please remove the parameter --incremental lastmodified.
解决办法:使用导入HDFS 的方法,只不过 --target-dir 设置成 Hive table 在 HDFS 中的位置,类似于 /user/hive/warehouse/store ,前提 Hive 中已经存在这个表了,不存在的话先CREATE。
sqoop incremental import to hive table
问题1:导入后每一行所有数据都在第一个字段里?
原因和解决:因为直接导入 HDFS 中 HIve 里的文件夹下的话,sqoop 默认给的 值分隔符 是逗号 , ,而 Hive 默认值分割符是 \001 ,即: ^A ,所以 Hive 是不认的,所以 需要 把值分隔符改成 ^A ,即加上下边的配置: --fields-terminated-by \001
问题2:还是上边那个问题,导入后从Hive中查到的数据条数比实际从关系数据库中查到的条数多?
原因和解决:这样子增量导入时,因为直接是导入 hdfs 的,不是 --hive-import ,所以就没法 用 --hive-drop-import-delims 去掉冲突的分隔符了。所以就又会出现上边那个问题。解决办法是:在 --query 后的SQL语句里做 replace 方法嵌套做替换,把 \r 和 \n 分别替换为空字符串, REPLACE(REPLACE(name,CHR(13),''),CHR(10),'') ,如下: --query select REPLACE(REPLACE(name,CHR(13),''),CHR(10),'') name,code from test
定义 sqoop job 增量导入
文档 12. Saved Jobs
用 sqoop job 做增量更新,它会在它的 metastore 中管理 --last-value ,很方便。 sqoop job --create test_import_store -- --options-file /root/sqoop_option_files/store_import.txt 1
这里有一个很弱智的坑,很容易栽进来,困了我快一天。
一般网上定义job的例子如下: sqoop job --create user_info -- import --connect jdbc:mysql://mysql-server-ip:3306/sqoop --username root --password root --table user_info -m 1 1
看这里, sqoop job --create user_info -- import ,看到了吗,’–’ 和 ‘import’ 之间有一个空格,很容易忽视的一点。这里用 ‘–’ 来指示 job 的内容,第一次见,很容易把 –import 当成一个整体。不注意这个,就会一直报 不能识别的参数
ERROR tool.BaseSqoopTool: Unrecognized argument
这里我觉得,sqoop job 和 –options-file 一块用才更配,很方便去编辑 job,如下一样的模版,很容易避开上边那个坑。 sqoop job --create -- --options-file 1
在metastore中保存密码
默认每次执行 job 时都需要手动输入密码。这样很不利于自动化任务和 Oozie 调度,所以需要设置在 sqoop metastore 中保存密码。
You can enable passwords in the metastore by setting sqoop.metastore.client.record.password to true in the configuration.
解决方式有两种:
第一种方式,使用password-file;
第二种方式,在sqoop-site.xml中添加如下属性即可(添加后第一次仍然需要输入密码 )。 sqoop.metastore.client.record.password true sqoop 常用命令
Sqoop详细介绍包括:sqoop命令,原理,流程
Sqoop使用手册
安装Hue后的一些功能的问题解决干货总结(博主推荐)
大数据
2018-07-24 17:21:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
1.term query
term是代表完全匹配,也就是精确查询,搜索前不会再对搜索词进行分词,所以我们的搜索词必须是文档分词集合中的一个。 /** * 词项查询 */ public static void termQuery() { //精确匹配,不会再对搜索词进行分词,搜索词必须是文档分词集合的一部分就会被查到。 QueryBuilder query = QueryBuilders.termQuery("title", "董事长"); SearchResponse response = getClient().prepareSearch("telegraph").setQuery(query).setSize(5).get(); SearchHits hits = response.getHits(); for (SearchHit searchHit : hits) { System.out.println(searchHit.getSourceAsString()); } }
测试 public static void main(String[] args) { termQuery(); }
执行结果: { "title":"董事长、3名公司高管和2名中层人员被公安机关依法审查", "content":"财联社7月24日讯,长生生物公告称,董事长及部分高管无法正常履职,长春市长春新区公安分局对长春长生生产冻干人用狂犬病疫苗涉嫌违法犯罪案件立案调查,将主要涉案人员公司董事长、3名公司高管和2名中层人员带至公安机关依法审查", "author":"长生生物" }
2.terms query
多个搜索词,只要任意一个满足条件 /** * 多搜索词查询 */ public static void termsQuery(){ //精确匹配,搜索文档的title字段的分词集合中包含任意搜索词 QueryBuilder query = QueryBuilders.termsQuery("title","主席","合作"); SearchResponse response = getClient().prepareSearch("telegraph").setQuery(query).setSize(5).get(); SearchHits hits = response.getHits(); for (SearchHit searchHit : hits) { System.out.println(searchHit.getSourceAsString()); } }
测试 public static void main(String[] args) { termsQuery(); }
查询结果: {"title":"碧桂园集团副主席杨惠妍","content":"杨惠妍分别于7月10日、11日买入碧桂园1000万股、1500万股","author":"小财注","pubdate":"2018-07-17T16:12:55"} {"title":"河北聚焦十大行业推进国际产能合作","content":"河北省政府近日出台积极参与“一带一路”建设推进国际产能合作实施方案","author":"财联社","pubdate":"2018-07-17T14:14:55"}
3.range query
根据范围查询文档 /** * 根据范围查询文档 */ public static void rangeQuery() { //根据范围查询,这里from...to...相当于gte...lte... QueryBuilder query = QueryBuilders.rangeQuery("pubdate").from("2018-07-17T12:0:0").to("2018-07-17T17:18:30") .format("yyyy-MM-dd'T'HH:mm:ss"); SearchResponse response = getClient().prepareSearch("telegraph").setQuery(query).setSize(5).get(); SearchHits hits = response.getHits(); for (SearchHit searchHit : hits) { System.out.println(searchHit.getSourceAsString()); } }
测试 public static void main(String[] args) { rangeQuery(); }
查询结果: {"title":"周五召开董事会会议 审议及批准更新后的一季报","content":"以审议及批准更新后的2018年第一季度报告","author":"中兴通讯","pubdate":"2018-07-17T12:33:11"} {"title":"碧桂园集团副主席杨惠妍","content":"杨惠妍分别于7月10日、11日买入碧桂园1000万股、1500万股","author":"小财注","pubdate":"2018-07-17T16:12:55"} {"title":"宝泰隆:半年报预增140%-156%","content":"公司主要产品焦炭、甲醇销售量及销售价格较上年同期有较大的上涨","author":"宝泰隆","pubdate":"2018-07-17T17:16:30"} {"title":"河北聚焦十大行业推进国际产能合作","content":"河北省政府近日出台积极参与“一带一路”建设推进国际产能合作实施方案","author":"财联社","pubdate":"2018-07-17T14:14:55"}
4.exists query
查询判断文档的指定含有非空的值。
5.prefix query
查询文档指定字段的分词集合,以搜索词开头 /** * 以搜索词开头 */ public static void prefixQuery(){ QueryBuilder query = QueryBuilders.prefixQuery("title","报"); SearchResponse response = getClient().prepareSearch("telegraph").setQuery(query).setSize(5).get(); SearchHits hits = response.getHits(); for (SearchHit searchHit : hits) { System.out.println(searchHit.getSourceAsString()); } }
测试 public static void main(String[] args) { prefixQuery(); }
搜索结果: { "title":"二甲苯销售报价大涨9.12%", "content":"财联社7月24日讯,据百川资讯,今日PX(对二甲苯)报价8140元/吨,涨9.12%,丁二烯报价1.33万元/吨,涨3.1%;维生素E涨2.56%;氧化钼涨2.4%;电解锰涨2.13%", "author":"财联社" }
6.通配符查询
可以使用*代表所有字符进行通配符查询 /** * 通配符查询 */ public static void wildcardQuery() { //通配符查询 QueryBuilder query = QueryBuilders.wildcardQuery("title", "*审*"); SearchResponse response = getClient().prepareSearch("telegraph").setQuery(query).setSize(5).get(); SearchHits hits = response.getHits(); for (SearchHit searchHit : hits) { System.out.println(searchHit.getSourceAsString()); } }
测试 public static void main(String[] args) { wildcardQuery(); }
查询结果: {"title":"周五召开董事会会议 审议及批准更新后的一季报","content":"以审议及批准更新后的2018年第一季度报告","author":"中兴通讯","pubdate":"2018-07-17T12:33:11"} { "title":"董事长、3名公司高管和2名中层人员被公安机关依法审查", "content":"财联社7月24日讯,长生生物公告称,董事长及部分高管无法正常履职,长春市长春新区公安分局对长春长生生产冻干人用狂犬病疫苗涉嫌违法犯罪案件立案调查,将主要涉案人员公司董事长、3名公司高管和2名中层人员带至公安机关依法审查", "author":"长生生物" }
7.正则匹配查询
根据正则表达式进行匹配 /** * 正则匹配查询 */ public static void regexpQuery(){ //根据正则表达式搜索文档 QueryBuilder query = QueryBuilders.regexpQuery("title","二甲苯.*"); SearchResponse response = getClient().prepareSearch("telegraph").setQuery(query).setSize(5).get(); SearchHits hits = response.getHits(); for (SearchHit searchHit : hits) { System.out.println(searchHit.getSourceAsString()); } }
测试 public static void main(String[] args) { regexpQuery(); }
执行结果: { "title":"二甲苯销售报价大涨9.12%", "content":"财联社7月24日讯,据百川资讯,今日PX(对二甲苯)报价8140元/吨,涨9.12%,丁二烯报价1.33万元/吨,涨3.1%;维生素E涨2.56%;氧化钼涨2.4%;电解锰涨2.13%", "author":"财联社" }
8.相似度查询
查询和搜索词相似度比较高的文档 /** * 查询相似度比较近的文档 */ public static void fuzzyQuery(){ //查询title包含和titlt比较相近的文档 QueryBuilder query = QueryBuilders.fuzzyQuery("title","titlt"); SearchResponse response = getClient().prepareSearch("telegraph").setQuery(query).setSize(5).get(); SearchHits hits = response.getHits(); for (SearchHit searchHit : hits) { System.out.println(searchHit.getSourceAsString()); } }
测试 public static void main(String[] args) { fuzzyQuery(); }
查询结果 {"title":"被更新之后title","content":"测试添加内容"}
9.ids查询
根据文档Id列表查询文档 /** * 根据id列表查询 */ public static void idsQuery(){ //根据文档Id列表查询 QueryBuilder query = QueryBuilders.idsQuery().addIds("2","AZetp2QBW8hrYY3zGJk7","BZfVymQBW8hrYY3zN5kH"); SearchResponse response = getClient().prepareSearch("telegraph").setQuery(query).setSize(5).get(); SearchHits hits = response.getHits(); for (SearchHit searchHit : hits) { System.out.println(searchHit.getSourceAsString()); } }
测试 public static void main(String[] args) { idsQuery(); }
执行结果 {"title":"周五召开董事会会议 审议及批准更新后的一季报","content":"以审议及批准更新后的2018年第一季度报告","author":"中兴通讯","pubdate":"2018-07-17T12:33:11"} {"title":"被更新之后title","content":"测试添加内容"} { "title":"董事长、3名公司高管和2名中层人员被公安机关依法审查", "content":"财联社7月24日讯,长生生物公告称,董事长及部分高管无法正常履职,长春市长春新区公安分局对长春长生生产冻干人用狂犬病疫苗涉嫌违法犯罪案件立案调查,将主要涉案人员公司董事长、3名公司高管和2名中层人员带至公安机关依法审查", "author":"长生生物" }
大数据
2018-07-24 16:16:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
1.查询全部 /** * 分页查询索引中数据 * * @param indexName * @param size * @param form * @return */ public static List matchAllQuery(String indexName, int size, int from) { QueryBuilder query = QueryBuilders.matchAllQuery(); SearchResponse response = getClient().prepareSearch(indexName).setQuery(query).setFrom(from).setSize(size) .get(); SearchHits htis = response.getHits(); List ts = new ArrayList(); Telegraph t; for (SearchHit searchHit : htis) { System.out.println(searchHit.getSourceAsString()); t = new Telegraph(); t.setTitle((String) searchHit.getSourceAsMap().get("title")); t.setContent((String) searchHit.getSourceAsMap().get("content")); t.setAuthor((String) searchHit.getSourceAsMap().get("author")); t.setPubdate((String) searchHit.getSourceAsMap().get("pubdate")); ts.add(t); } return ts; }
测试分页查询 public static void main(String[] args) { List list = matchAllQuery("telegraph", 5, 0); for (Telegraph telegraph : list) { System.out.println("title:"+telegraph.getTitle()+",content:"+telegraph.getContent()+",author:"+telegraph.getAuthor()); } }
2.全文搜索
执行全文查询的标准查询:”title“字段中包含”董事“的文档 /** * 根据查询条件查询 */ public static void matchQuery() { QueryBuilder query = QueryBuilders.matchQuery("title", "董事"); SearchResponse response = getClient().prepareSearch("telegraph").setQuery(query).setSize(5).get(); SearchHits searchHits = response.getHits(); for (SearchHit searchHit : searchHits) { System.out.println(searchHit.getSourceAsString()); } }
测试 public static void main(String[] args) { matchQuery(); }
执行结果: {"title":"周五召开董事会会议 审议及批准更新后的一季报","content":"以审议及批准更新后的2018年第一季度报告","author":"中兴通讯","pubdate":"2018-07-17T12:33:11"} { "title":"董事长、3名公司高管和2名中层人员被公安机关依法审查", "content":"财联社7月24日讯,长生生物公告称,董事长及部分高管无法正常履职,长春市长春新区公安分局对长春长生生产冻干人用狂犬病疫苗涉嫌违法犯罪案件立案调查,将主要涉案人员公司董事长、3名公司高管和2名中层人员带至公安机关依法审查", "author":"长生生物" }
同时满足”董事“、”高管“条件 /** * 根据查询条件查询 */ public static void matchQuery() { QueryBuilder query = QueryBuilders.matchQuery("title", "董事高管").operator(Operator.AND); SearchResponse response = getClient().prepareSearch("telegraph").setQuery(query).setSize(5).get(); SearchHits searchHits = response.getHits(); for (SearchHit searchHit : searchHits) { System.out.println(searchHit.getSourceAsString()); } }
执行结果: { "title":"董事长、3名公司高管和2名中层人员被公安机关依法审查", "content":"财联社7月24日讯,长生生物公告称,董事长及部分高管无法正常履职,长春市长春新区公安分局对长春长生生产冻干人用狂犬病疫苗涉嫌违法犯罪案件立案调查,将主要涉案人员公司董事长、3名公司高管和2名中层人员带至公安机关依法审查", "author":"长生生物" }
查询”title“或者”content“任一满足条件”销售“的文档 /** * 根据查询条件查询 */ public static void matchQuery() { //QueryBuilder query = QueryBuilders.matchQuery("title", "董事高管").operator(Operator.AND); QueryBuilder query = QueryBuilders.multiMatchQuery("销售", "title","content");// SearchResponse response = getClient().prepareSearch("telegraph").setQuery(query).setSize(5).get(); SearchHits searchHits = response.getHits(); for (SearchHit searchHit : searchHits) { System.out.println(searchHit.getSourceAsString()); } }
查询结果; {"title":"宝泰隆:半年报预增140%-156%","content":"公司主要产品焦炭、甲醇销售量及销售价格较上年同期有较大的上涨","author":"宝泰隆","pubdate":"2018-07-17T17:16:30"} { "title":"二甲苯销售报价大涨9.12%", "content":"财联社7月24日讯,据百川资讯,今日PX(对二甲苯)报价8140元/吨,涨9.12%,丁二烯报价1.33万元/吨,涨3.1%;维生素E涨2.56%;氧化钼涨2.4%;电解锰涨2.13%", "author":"财联社" }
大数据
2018-07-24 14:09:00
「深度学习福利」大神带你进阶工程师,立即查看>>> 创建kafka topic 查看所有topic列表 查看指定topic信息 控制台向topic生产数据 控制台消费topic的数据 查看topic某分区偏移量最大(小)值 增加topic分区数 删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除 查看topic消费进度
创建kafka topic bin/kafka-topics.sh --zookeeper node01:2181 --create --topic t_cdr --partitions 30 --replication-factor 2
注: partitions指定topic分区数,replication-factor指定topic每个分区的副本数 partitions分区数: partitions :分区数,控制topic将分片成多少个log。可以显示指定,如果不指定则会使用broker(server.properties)中的num.partitions配置的数量 虽然增加分区数可以提供kafka集群的吞吐量、但是过多的分区数或者或是单台服务器上的分区数过多,会增加不可用及延迟的风险。因为多的分区数,意味着需要打开更多的文件句柄、增加点到点的延时、增加客户端的内存消耗。 分区数也限制了consumer的并行度,即限制了并行consumer消息的线程数不能大于分区数 分区数也限制了producer发送消息是指定的分区。如创建topic时分区设置为1,producer发送消息时通过自定义的分区方法指定分区为2或以上的数都会出错的;这种情况可以通过alter –partitions 来增加分区数。 replication-factor副本 replication factor 控制消息保存在几个broker(服务器)上,一般情况下等于broker的个数。 如果没有在创建时显示指定或通过API向一个不存在的topic生产消息时会使用broker(server.properties)中的default.replication.factor配置的数量
查看所有topic列表 bin/kafka-topics.sh --zookeeper node01:2181 --list
查看指定topic信息 bin/kafka-topics.sh --zookeeper node01:2181 --describe --topic t_cdr
控制台向topic生产数据 bin/kafka-console-producer.sh --broker-list node86:9092 --topic t_cdr
控制台消费topic的数据 bin/kafka-console-consumer.sh --zookeeper node01:2181 --topic t_cdr --from-beginning
查看topic某分区偏移量最大(小)值 bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic hive-mdatabase-hostsltable --time -1 --broker-list node86:9092 --partitions 0
注: time为-1时表示最大值,time为-2时表示最小值
增加topic分区数
为topic t_cdr 增加10个分区 bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic t_cdr --partitions 10
删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除 bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper node01:2181 --topic t_cdr
查看topic消费进度
这个会显示出consumer group的offset情况, 必须参数为--group, 不指定--topic,默认为所有topic
Displays the: Consumer Group, Topic, Partitions, Offset, logSize, Lag, Owner for the specified set of Topics and Consumer Group bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker required argument: [group] Option Description ------ ----------- --broker-info Print broker info --group Consumer group. --help Print this message. --topic Comma-separated list of consumer topics (all topics if absent). --zkconnect ZooKeeper connect string. (default: localhost:2181) Example, bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv Group Topic Pid Offset logSize Lag Owner pv page_visits 0 21 21 0 none pv page_visits 1 19 19 0 none pv page_visits 2 20 20 0 none
分类: Kafka系列
标签: kafka , shell , 基本命令
大数据
2018-07-24 10:42:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
一直以来都想写一点关于kafka consumer的东西,特别是关于新版consumer的中文资料很少。最近Kafka社区邮件组已经在讨论是否应该正式使用新版本consumer替换老版本,笔者也觉得时机成熟了,于是写下这篇文章讨论并总结一下新版本consumer的些许设计理念,希望能把consumer这点事说清楚,从而对广大使用者有所帮助。
在开始之前,我想花一点时间先来明确一些概念和术语,这会极大地方便我们下面的讨论。另外请原谅这文章有点长,毕竟要讨论的东西很多,虽然已然删除了很多太过细节的东西。

一、 误区澄清与概念明确
1 Kafka的版本
很多人在Kafka中国社区(替群主做个宣传,QQ号:162272557)提问时的开头经常是这样的:“我使用的kafka版本是2.10/2.11, 现在碰到一个奇怪的问题。。。。” 无意冒犯,但这里的2.10/2.11不是kafka的版本,而是编译kafka的Scala版本。Kafka的server端代码是由Scala语言编写的,目前Scala主流的3个版本分别是2.10、2.11和2.12。实际上Kafka现在每个PULL request都已经自动增加了这三个版本的检查。下图是我的一个PULL request,可以看到这个fix会同时使用3个scala版本做编译检查:
目前广泛使用kafka的版本应该是这三个大版本:0.8.x, 0.9.x和0.10.* 。 这三个版本对于consumer和consumer group来说都有很大的变化,我们后面会详谈。
2 新版本 VS 老版本
“我的kafkaoffsetmonitor为什么无法监控到offset了?”——这是我在Kafka中国社区见到最多的问题,没有之一!实际上,Kafka 0.9开始提供了新版本的consumer及consumer group,位移的管理与保存机制发生了很大的变化——新版本consumer默认将不再保存位移到zookeeper中,而目前kafkaoffsetmonitor还没有应对这种变化(虽然已经有很多人在要求他们改了,详见 https://github.com/quantifind/KafkaOffsetMonitor/issues/79 ),所以很有可能是因为你使用了新版本的consumer才无法看到的。关于新旧版本,这里统一说明一下:kafka0.9以前的consumer是使用Scala编写的,包名结构是kafka.consumer.*,分为high-level consumer和low-level consumer两种。我们熟知的ConsumerConnector、ZookeeperConsumerConnector以及SimpleConsumer就是这个版本提供的;自0.9版本开始,Kafka提供了java版本的consumer,包名结构是o.a.k.clients.consumer.*,熟知的类包括KafkaConsumer和ConsumerRecord等。新版本的consumer可以单独部署,不再需要依赖server端的代码。

二、消费者组 (Consumer Group)
1 什么是消费者组
其实对于这些基本概念的普及,网上资料实在太多了。我本不应该再画蛇添足了,但为了本文的完整性,我还是要花一些篇幅来重谈consumer group,至少可以说说我的理解。值得一提的是,由于我们今天基本上只探讨consumer group,对于单独的消费者不做过多讨论。
什么是consumer group? 一言以蔽之,consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。(网上文章中说到此处各种炫目多彩的图就会紧跟着抛出来,我这里就不画了,请原谅)。个人认为,理解consumer group记住下面这三个特性就好了: consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程 group.id是一个字符串,唯一标识一个consumer group consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)
2 消费者位置(consumer position)
消费者在消费的过程中需要记录自己消费了多少数据,即消费位置信息。在Kafka中这个位置信息有个专门的术语:位移(offset)。很多消息引擎都把这部分信息保存在服务器端(broker端)。这样做的好处当然是实现简单,但会有三个主要的问题:1. broker从此变成有状态的,会影响伸缩性;2. 需要引入应答机制(acknowledgement)来确认消费成功。3. 由于要保存很多consumer的offset信息,必然引入复杂的数据结构,造成资源浪费。而Kafka选择了不同的方式:每个consumer group保存自己的位移信息,那么只需要简单的一个整数表示位置就够了;同时可以引入checkpoint机制定期持久化,简化了应答机制的实现。
3 位移管理(offset management)
3.1 自动VS手动
Kafka默认是定期帮你自动提交位移的(enable.auto.commit = true),你当然可以选择手动提交位移实现自己控制。另外kafka会定期把group消费情况保存起来,做成一个offset map,如下图所示:
上图中表明了test-group这个组当前的消费情况。

3.2 位移提交
老版本的位移是提交到zookeeper中的,图就不画了,总之目录结构是:/consumers/< group.id >/offsets//,但是zookeeper其实并不适合进行大批量的读写操作,尤其是写操作。因此kafka提供了另一种解决方案:增加__consumeroffsets topic,将offset信息写入这个topic,摆脱对zookeeper的依赖(指保存offset这件事情)。__consumer_offsets中的消息保存了每个consumer group某一时刻提交的offset信息。依然以上图中的consumer group为例,格式大概如下:

__consumers_offsets topic配置了compact策略,使得它总是能够保存最新的位移信息,既控制了该topic总体的日志容量,也能实现保存最新offset的目的。compact的具体原理请参见: Log Compaction
至于每个group保存到__consumers_offsets的哪个分区,如何查看的问题请参见这篇文章: Kafka 如何读取offset topic内容 (__consumer_offsets)

4 Rebalance
4.1 什么是rebalance?
rebalance本质上是一种协议,规定了一个consumer group下的所有consumer如何达成一致来分配订阅topic的每个分区。比如某个group下有20个consumer,它订阅了一个具有100个分区的topic。正常情况下,Kafka平均会为每个consumer分配5个分区。这个分配的过程就叫rebalance。
4.2 什么时候rebalance?
这也是经常被提及的一个问题。rebalance的触发条件有三种: 组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了——这两者的区别后面会谈到) 订阅主题数发生变更——这当然是可能的,如果你使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance 订阅主题的分区数发生变更
4.3 如何进行组内分区分配?
之前提到了group下的所有consumer都会协调在一起共同参与分配,这是如何完成的?Kafka新版本consumer默认提供了两种分配策略:range和round-robin。当然Kafka采用了可插拔式的分配策略,你可以创建自己的分配器以实现不同的分配策略。实际上,由于目前range和round-robin两种分配器都有一些弊端,Kafka社区已经提出第三种分配器来实现更加公平的分配策略,只是目前还在开发中。我们这里只需要知道consumer group默认已经帮我们把订阅topic的分区分配工作做好了就行了。
简单举个例子,假设目前某个consumer group下有两个consumer: A和B,当第三个成员加入时,kafka会触发rebalance并根据默认的分配策略重新为A、B和C分配分区,如下图所示:

4.4 谁来执行rebalance和consumer group管理?
Kafka提供了一个角色:coordinator来执行对于consumer group的管理。坦率说kafka对于coordinator的设计与修改是一个很长的故事。最新版本的coordinator也与最初的设计有了很大的不同。这里我只想提及两次比较大的改变。
首先是0.8版本的coordinator,那时候的coordinator是依赖zookeeper来实现对于consumer group的管理的。Coordinator监听zookeeper的/consumers//ids的子节点变化以及/brokers/topics/数据变化来判断是否需要进行rebalance。group下的每个consumer都自己决定要消费哪些分区,并把自己的决定抢先在zookeeper中的/consumers//owners//下注册。很明显,这种方案要依赖于zookeeper的帮助,而且每个consumer是单独做决定的,没有那种“大家属于一个组,要协商做事情”的精神。
基于这些潜在的弊端,0.9版本的kafka改进了coordinator的设计,提出了group coordinator——每个consumer group都会被分配一个这样的coordinator用于组管理和位移管理。这个group coordinator比原来承担了更多的责任,比如组成员管理、位移提交保护机制等。当新版本consumer group的第一个consumer启动的时候,它会去和kafka server确定谁是它们组的coordinator。之后该group内的所有成员都会和该coordinator进行协调通信。显而易见,这种coordinator设计不再需要zookeeper了,性能上可以得到很大的提升。后面的所有部分我们都将讨论最新版本的coordinator设计。
4.5 如何确定coordinator?
上面简单讨论了新版coordinator的设计,那么consumer group如何确定自己的coordinator是谁呢? 简单来说分为两步: 确定consumer group位移信息写入__consumers_offsets的哪个分区。具体计算公式:   __consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount) 注意:groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。 该分区leader所在的broker就是被选定的coordinator
4.6 Rebalance Generation
JVM GC的分代收集就是这个词(严格来说是generational),我这里把它翻译成“届”好了,它表示了rebalance之后的一届成员,主要是用于保护consumer group,隔离无效offset提交的。比如上一届的consumer成员是无法提交位移到新一届的consumer group中。我们有时候可以看到ILLEGAL_GENERATION的错误,就是kafka在抱怨这件事情。每次group进行rebalance之后,generation号都会加1,表示group进入到了一个新的版本,如下图所示: Generation 1时group有3个成员,随后成员2退出组,coordinator触发rebalance,consumer group进入Generation 2,之后成员4加入,再次触发rebalance,group进入Generation 3.

4.7 协议(protocol)
前面说过了, rebalance本质上是一组协议。group与coordinator共同使用它来完成group的rebalance。目前kafka提供了5个协议来处理与consumer group coordination相关的问题: Heartbeat请求:consumer需要定期给coordinator发送心跳来表明自己还活着 LeaveGroup请求:主动告诉coordinator我要离开consumer group SyncGroup请求:group leader把分配方案告诉组内所有成员 JoinGroup请求:成员请求加入组 DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用
Coordinator在rebalance的时候主要用到了前面4种请求。
4.8 liveness
consumer如何向coordinator证明自己还活着? 通过定时向coordinator发送Heartbeat请求。如果超过了设定的超时时间,那么coordinator就认为这个consumer已经挂了。一旦coordinator认为某个consumer挂了,那么它就会开启新一轮rebalance,并且在当前其他consumer的心跳response中添加“REBALANCE_IN_PROGRESS”,告诉其他consumer:不好意思各位,你们重新申请加入组吧!
4.9 Rebalance过程
终于说到consumer group执行rebalance的具体流程了。很多用户估计对consumer内部的工作机制也很感兴趣。下面就跟大家一起讨论一下。当然我必须要明确表示,rebalance的前提是coordinator已经确定了。
总体而言,rebalance分为2步:Join和Sync
1 Join, 顾名思义就是加入组。这一步中,所有成员都向coordinator发送JoinGroup请求,请求入组。一旦所有成员都发送了JoinGroup请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader——注意leader和coordinator不是一个概念。leader负责消费分配方案的制定。
2 Sync,这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。
还是拿几张图来说明吧,首先是加入组的过程:
值得注意的是, 在coordinator收集到所有成员请求前,它会把已收到请求放入一个叫purgatory(炼狱)的地方。记得国内有篇文章以此来证明kafka开发人员都是很有文艺范的,写得也是比较有趣,有兴趣可以去搜搜。
然后是分发分配方案的过程,即SyncGroup请求:
注意!! consumer group的分区分配方案是在客户端执行的!Kafka将这个权利下放给客户端主要是因为这样做可以有更好的灵活性。比如这种机制下我可以实现类似于Hadoop那样的机架感知(rack-aware)分配方案,即为consumer挑选同一个机架下的分区数据,减少网络传输的开销。Kafka默认为你提供了两种分配策略:range和round-robin。由于这不是本文的重点,这里就不再详细展开了,你只需要记住你可以覆盖consumer的参数:partition.assignment.strategy来实现自己分配策略就好了。
4.10 consumer group状态机
和很多kafka组件一样,group也做了个状态机来表明组状态的流转。coordinator根据这个状态机会对consumer group做不同的处理,如下图所示(完全是根据代码注释手动画的,多见谅吧)

简单说明下图中的各个状态: Dead:组内已经没有任何成员的最终状态,组的元数据也已经被coordinator移除了。这种状态响应各种请求都是一个response: UNKNOWN_MEMBER_ID Empty:组内无成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求 PreparingRebalance:组准备开启新的rebalance,等待成员加入 AwaitingSync:正在等待leader consumer将分配方案传给各个成员 Stable:rebalance完成!可以开始消费了~
至于各个状态之间的流程条件以及action,这里就不具体展开了。

三、rebalance场景剖析
上面详细阐述了consumer group是如何执行rebalance的,可能依然有些云里雾里。这部分对其中的三个重要的场景做详尽的时序展开,进一步加深对于consumer group内部原理的理解。由于图比较直观,所有的描述都将以图的方式给出,不做过多的文字化描述了。
1 新成员加入组(member join)

2 组成员崩溃(member failure)
前面说过了,组成员崩溃和组成员主动离开是两个不同的场景。因为在崩溃时成员并不会主动地告知coordinator此事,coordinator有可能需要一个完整的session.timeout周期才能检测到这种崩溃,这必然会造成consumer的滞后。可以说离开组是主动地发起rebalance;而崩溃则是被动地发起rebalance。okay,直接上图:
3 组成员主动离组(member leave group)
4 提交位移(member commit offset)


总结一下,本文着重讨论了一下新版本的consumer group的内部设计原理,特别是consumer group与coordinator之间的交互过程,希望对各位有所帮助。
大数据
2018-07-24 10:22:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
博主前面文章有介绍过软件的安装,可以帮助IT人员顺利的完成功能软件安装;但是,对于我们运维人员或者需要管理软件安装的项目经理来说,有些应用一次行需要搭建很多台相同的软件环境(如tomcat+java);那么,此时如果我们再一台一台去安装,那就显得效率太低;所以此篇文章博主将详细讲解如何通过自动化脚本来实现服务器批量安装软件环境。
步骤:
(1)首先,需要检查要要安装软件环境的服务器上是否安装了ssh服务端和客户端,没有安装的需要安装上。 rpm -qa|grep ssh 检查服务器上已经安装了的ssh相关软件 yum list|grep ssh 检查yum仓库中可用的ssh相关的软件包 yum -y install openssh-server 安装服务端 yum -y install openssh-clinets 安装客户端
(2)检查root用户是否可使用ssh连接:ssh root@192.168.29.135
注意:如果此处无法实现登录报远程登录限制,则需要在192.168.29.135的ssh配置文件中开启运行root用户远程登录。
(3)上传jdk软件包到httpd服务器(用于执行自动化脚本的服务器)Alt+p上传
(4)访问httpd服务:http://192.168.29.133/soft
(5)编写自动化脚本(并且赋予执行权限)
ssh远程登录其他服务器的脚本(/root/boot.sh) #!/bin/bash SERVERS="192.168.29.135 192.168.29.136" PASSWORD=hadoop BASE_SERVER=192.168.29.133 auto_ssh_copy_id() { expect -c "set timeout -1; spawn ssh-copy-id $1; expect { *(yes/no)* {send -- yes\r;exp_continue;} *assword:* {send -- $2\r;exp_continue;} eof {exit 0;} }"; } ssh_copy_id_to_all() { for SERVER in $SERVERS do auto_ssh_copy_id $SERVER $PASSWORD done } ssh_copy_id_to_all for SERVER in $SERVERS do scp install.sh root@$SERVER:/root ssh root@$SERVER /root/install.sh done
远程登录后调用该脚本进行自动化软件安装(/root/install.sh) #!/bin/bash BASE_SERVER=192.168.29.133 yum install -y wget wget $BASE_SERVER/soft/jdk-7u45-linux-x64.tar.gz tar -zxvf jdk-7u45-linux-x64.tar.gz -C /usr/local cat >> /etc/profile << EOF export JAVA_HOME=/usr/local/jdk1.7.0_45 export PATH=\$PATH:\$JAVA_HOME/bin EOF
执行命令赋予权限:chmod 777 /root/install.sh /root/boot.sh
(6)生成ssh秘钥对:ssh-kegen
(8)安装expect人机交互命令(此命令在自动化脚本中要使用)
(9)执行boot.sh进行自动化脚本安装软件:sh boot.sh
(10)安装完成后,去其他服务器source /etc/profile后验证jdk

至此,自动化脚本的安装运行整个过程完成。如果大家觉得不错,请点赞博主的文章;如果您对其他服务器技术感兴趣,请关注博主博客,并随时欢迎同博主本人交流。
大数据
2018-07-24 00:28:00
「深度学习福利」大神带你进阶工程师,立即查看>>> 李钰(社区ID:Yu Li),阿里巴巴计算平台事业部高级技术专家,HBase开源社区PMC&committer。开源技术爱好者,主要关注分布式系统设计、大数据基础平台建设等领域。连续4年基于HBase/HDFS设计和开发存储系统应对双十一访问压力,具备丰富的大规模集群生产实战经验
摘要
性能优化 针对IO的性能优化 不同版本值得注意的性能问题/优化
 监控和问题排查 Important metrics Logs and debugging
针对IO的性能优化


不同版本值得注意的性能问题
问题排查: 重要的监控指标




问题排查: Server端日志
问题排查:Client端日志
To Be Continued

大家工作学习遇到HBase技术问题,把问题发布到HBase技术社区论坛http://hbase.group ,欢迎大家论坛上面提问留言讨论。想了解更多HBase技术关注HBase技术社区公众号(微信号:hbasegroup),非常欢迎大家积极投稿。
长按下面的二维码加入HBase技术社区微信群
大数据
2018-07-23 21:18:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
1.批量查询
Multi Get API public static void multiGet() { // 批量查询 MultiGetResponse response = getClient().prepareMultiGet() .add("my_person", "my_index", "1")// 查询id为1的文档 .add("my_person", "my_index", "2", "3", "4")// ids,id列表 .add("telegraph", "msg", "2")// 可以查询其他索引里面的数据 .get(); // 获取相应结果 for (MultiGetItemResponse multiGetItemResponse : response) { // 遍历结果集 GetResponse getResponse = multiGetItemResponse.getResponse(); if (getResponse.isExists()) {// 判断文档是否存在 String json = getResponse.getSourceAsString(); System.out.println(json); } } }
测试 public static void main(String[] args) { multiGet(); }
执行结果 { "name":"sean", "age":22, "salary":6000 } { "name":"sim", "age":20, "salary":5000 } { "name":"duck", "age":28, "salary":8000 } { "name":"lily", "age":20, "salary":4000 } {"title":"被更新之后title","content":"测试添加内容"}
2.批量操作
Bulk API /** * 批量操作 * @throws Exception */ public static void bulk() throws Exception { BulkRequestBuilder bulkRequest = getClient().prepareBulk(); bulkRequest.add(getClient().prepareIndex("telegraph", "msg", "3") .setSource(XContentFactory.jsonBuilder().startObject().field("title", "控股股东涉嫌内幕交易 被证监会立案调查") .field("content", "财联社7月23日讯,嘉欣丝绸晚间公告,控股股东、董事长周国建因其涉嫌内幕交易,收到中国证监会的《调查通知书》,对其进行立案调查") .endObject())); bulkRequest.add(getClient().prepareIndex("telegraph", "msg", "4") .setSource(XContentFactory.jsonBuilder().startObject().field("title", "泛海控股股价13连阳 控股股东今日再增持213万股") .field("content", "财联社7月23日讯,泛海控股晚间公告,控股股东中国泛海于7月23日增持了213.16万股公司股份,约占公司股份总数的0.0410%,成交均价为6.798 元/股") .endObject())); // 批量执行 BulkResponse bulkResponse = bulkRequest.get(); System.out.println(bulkResponse.status()); // 判断是否存在失败操作 if (bulkResponse.hasFailures()) { System.out.println("存在失败操作"); } //遍历每个操作的执行结果 for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) { System.out.println(bulkItemResponse.getResponse().toString()); } }
测试操作 public static void main(String[] args) { try { bulk(); } catch (Exception e) { e.printStackTrace(); } }
3.批量处理器(Bulk Processor)
BulkProcessor类提供了一个简单接口,可以根据请求的数量或大小自动刷新批量操作,也可以在给定的时间段之后自动刷新批量操作。 /** * 批量处理器 */ public static void bulkProcessor() { BulkProcessor.Listener listener = new BulkProcessor.Listener() { public void beforeBulk(long executionId, BulkRequest request) { // 执行批量操作之前 System.out.println(request.numberOfActions()); } public void afterBulk(long executionId, BulkRequest request, Throwable failure) { // 执行批量操作之后,异常 System.out.println("执行错误:" + request.toString() + ",失败:" + failure.getMessage()); } public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { // 执行批量操作之后 for (BulkItemResponse bulkItemResponse : response.getItems()) { System.out.println("执行成功"+bulkItemResponse.getResponse().toString()); } } }; // 设置执行器,包含执行时执行过程的监听,以及执行属性配置 BulkProcessor bulkProcessor = BulkProcessor.builder(getClient(), listener).setBulkActions(500) // 设置批量处理数量的阀值 .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))// 设置批量执执行处理请求大小阀值 .setFlushInterval(TimeValue.timeValueSeconds(5))// 设置刷新索引时间间隔 .setConcurrentRequests(1)// 设置并发处理线程个数 .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(100), 3))// 设置回滚策略,等待时间100,重试次数3 .build(); // 添加需要执行的请求 bulkProcessor.add(new DeleteRequest("telegraph", "msg", "3")); bulkProcessor.add(new DeleteRequest("telegraph", "msg", "4")); // 刷新请求 bulkProcessor.flush(); // 关闭执行器 bulkProcessor.close(); //刷新索引(没有这一步不执行) getClient().admin().indices().prepareRefresh().get(); }
测试 public static void main(String[] args) { try { bulkProcessor(); } catch (Exception e) { e.printStackTrace(); } }
4.查询删除
根据查询条件,删除满足条件的文档 /** * 根据查询条件删除文档 */ public static void deleteQuery() { //根据查询条件删除文档 BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(getClient()) .filter(QueryBuilders.matchQuery("title", "长生生物")).source("telegraph").get(); System.out.println(response.getDeleted());// 删除文档数量 }
测试 public static void main(String[] args) { try { deleteQuery(); } catch (Exception e) { e.printStackTrace(); } }
大数据
2018-07-23 21:14:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
1.保存文档
可以通过json工具把java对象转换成json字符串进行保存,也可以通过内置的帮助类直接构建json格式 /** * 获取客户端 * * @return */ public static TransportClient getClient() { Settings settings = Settings.builder().put("cluster.name", "es_cluster").build(); TransportClient client = null; try { client = new PreBuiltTransportClient(settings) .addTransportAddress(new TransportAddress(InetAddress.getByName("10.118.213.215"), 9300)); } catch (UnknownHostException e) { e.printStackTrace(); } return client; } /** * 保存文档到索引中 * * @param indexName * @param typeName * @param id * @param docJson */ public static void insertDoc(String indexName, String typeName, String id, String docJson) { IndexResponse response = getClient().prepareIndex(indexName, typeName, id).setSource(docJson, XContentType.JSON) .get(); System.out.println(response.status()); }
测试保存 public static void main(String[] args) throws Exception { // 时间格式转换 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); // 保存数据 Telegraph telegraph = new Telegraph(); telegraph.setAuthor("财政部部长"); telegraph.setTitle("中方正在加快推进各项开放措施的落地"); telegraph.setContent("财联社7月23日讯,财政部部长刘昆指出,中方正在加快推进各项开放措施的落地。中国开放的大门将越开越大,不仅促进中国高质量发展,也为全球经济增长提供重要动力"); telegraph.setPubdate(formatter.format(new Date())); // 对象转换为json字符串 String json = JSONObject.toJSONString(telegraph); System.out.println(json); insertDoc("telegraph", "msg", "1", json); // 使用内置帮助类XContentFactory构造json格式文档 XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("title", "长生生物官网被黑客攻击") .field("author", "长生生物") .field("content", "财联社7月23日讯,长生科技官网首页被黑客攻击,并配图“不搞你,对不起祖国的花朵”。打开长生生物的官网,显示网站已无法打开,错误提示404,截至发稿,网站尚未恢复") .field("pubdate", formatter.format(new Date())).endObject(); System.out.println(builder.string()); insertDoc("telegraph", "msg", "2", builder.string()); }
2.获取文档
根据文档索引、类型、id获取文档内容 /** * 获取文档 * @param indexName * @param typeName * @param id * @return */ public static String getIndexDoc(String indexName, String typeName, String id) { GetResponse response = getClient().prepareGet(indexName, typeName, id).get(); System.out.println(response.getVersion()); return response.getSourceAsString(); }
测试获取文档 public static void main(String[] args) { String source = getIndexDoc("telegraph", "msg", "2"); System.out.println(source); }
3.删除文档 /** * 删除文档 * @param indexName * @param typeName * @param id */ public static void deleteDoc(String indexName,String typeName,String id){ DeleteResponse response = getClient().prepareDelete(indexName, typeName, id).get(); System.out.println(response.status()); }
测试 public static void main(String[] args) { deleteDoc("telegraph", "msg", "1"); }
4.更新文档 /** * 更新文档内容 * * @param indexName * @param typeName * @param id * @param fields * @throws Exception */ public static void updateDoc(String indexName, String typeName, String id, Map fields) throws Exception { UpdateRequest request = new UpdateRequest(); XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); for (String key : fields.keySet()) { builder.field(key, fields.get(key)); } builder.endObject(); request.index(indexName).type(typeName).id(id).doc(builder); UpdateResponse response = getClient().update(request).get(); System.out.println(response.toString()); }
测试 public static void main(String[] args) { Map fields = new HashMap(); fields.put("title", "长生生物涉案狂犬病疫苗被指未作动物实验检测"); try { updateDoc("telegraph", "msg", "2", fields); } catch (Exception e) { e.printStackTrace(); } }
5.文档存在时更新不存在时新增 /** * 如果更新内容不存在,创建文档 * * @param indexName * @param typeName * @param id * @param indexDoc 添加内容 * @param updateField 更新内容 * @throws Exception */ public static void upset(String indexName, String typeName, String id, Map indexDoc, Map updateField) throws Exception { // 创建新增request IndexRequest indexRequest = new IndexRequest(indexName, typeName, id); XContentBuilder indexBuilder = XContentFactory.jsonBuilder().startObject(); for (String key : indexDoc.keySet()) { indexBuilder.field(key, indexDoc.get(key)); } indexBuilder.endObject(); System.out.println(indexBuilder.string()); indexRequest.source(indexBuilder); // 创建更新request UpdateRequest updateRequest = new UpdateRequest(indexName, typeName, id); XContentBuilder updateBuilder = XContentFactory.jsonBuilder().startObject(); for (String key : updateField.keySet()) { updateBuilder.field(key, updateField.get(key)); } updateBuilder.endObject(); System.out.println(updateBuilder.string()); updateRequest.doc(updateBuilder).upsert(indexRequest); UpdateResponse response = getClient().update(updateRequest).get(); System.out.println(response.toString()); }
测试 public static void main(String[] args) throws Exception{ //如果存在就更新 Map fields = new HashMap(); fields.put("title", "被更新之后title"); //如果不存在就新增 Map indexDoc = new HashMap(); indexDoc.put("title","第一次测试添加title"); indexDoc.put("content", "测试添加内容"); upset("telegraph","msg","2",indexDoc,fields); }
大数据
2018-07-23 17:44:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
CentOS7完美安装Hive2.3.0
花费了两天时间,查阅了许多资料,才把hive成功安装完成。下面将和大家分享下安装和注意事项,希望大家可以少走弯路!
大家安装的和修改配置文件的时候,一定要注意自己的安装路径和下载的软件版本,不要盲目操作
1.前期准备
完成hadoop的安装,大家可以参考我的上篇文章进行安装 http://blog.csdn.net/yonjarluo/article/details/77800372
完成mysql的安装,同样可以参考本人的
http://blog.csdn.net/yonjarluo/article/details/77879772

2.下载Hive wget http://mirror.bit.edu.cn/apache/hive/hive-2.3.0/apache-hive-2.3.0-bin.tar.gz
3.解压到指定安装目录 tar -zxvf apache-hive-2.3.0-bin.tar.gz mv apache-hive-2.3.0-bin /usr/local/hive cd /usr/local/hive
4.修改环境变量 vi /etc/profile 插入 export HIVE_HOME=/usr/local/hive export PATH=$HIVE_HOME/bin:$PATH
使其修改立即生效 source /etc/profile
5.登录mysql数据库,并创建metastore数据库,关闭新主库的只读属性,为其授权(用于存储hive的初始化配置)
create database metastore; set global read_only=0; grant all on metastore.* to hive@'%' identified by 'hive'; grant all on metastore.* to hive@'localhost' identified by 'hive'; flush privileges;

当修改数据库密码出现以下报错时:
Your password does not satisfy the current policy requirements set global validate_password_policy=0; set global validate_password_length=4;
如果不关闭数据库的只读属性,执行 grant all on metastore.* to hive@'%' identified by 'hive';时,会报错:


6.下载jdbc connector
点击链接 Connector/J 5.1.44 下载至本地主机,然后再传至 /usr/local/hive/lib

解压,把解压目录下的mysql-connector-java-5.1.44-bin.jar包,拷贝到/usr/local/hive/lib目录下
7.修改hive配置文件 cd /usr/local/hive/conf
复制初始化文件并重命名 cp hive-env.sh.template hive-env.sh cp hive-default.xml.template hive-site.xml cp hive-log4j2.properties.template hive-log4j2.properties cp hive-exec-log4j2.properties.template hive-exec-log4j2.properties
修改hive-env.sh文件
export JAVA_HOME=/usr/local/java/jdk1.8.0_141 ##Java路径,根据自己jdk安装的路径配置 export HADOOP_HOME=/usr/local/hadoop ##Hadoop安装路径 export HIVE_HOME=/usr/local/hive ##Hive安装路径 export HIVE_CONF_DIR=/usr/local/hive/conf ##Hive配置文件路径
启动Hadoop:
在hdfs中创建一下目录,并授权,用于存储文件 hdfs dfs -mkdir -p /user/hive/warehouse hdfs dfs -mkdir -p /user/hive/tmp hdfs dfs -mkdir -p /user/hive/log hdfs dfs -chmod -R 777 /user/hive/warehouse hdfs dfs -chmod -R 777 /user/hive/tmp hdfs dfs -chmod -R 777 /user/hive/log
修改hive-site.xml hive.exec.scratchdir /user/hive/tmp hive.metastore.warehouse.dir /user/hive/warehouse hive.querylog.location /user/hive/log ## 配置 MySQL 数据库连接信息 javax.jdo.option.ConnectionURL jdbc:mysql://localhost:3306/metastore?createDatabaseIfNotExist=true&characterEncoding=UTF-8&useSSL=false javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver javax.jdo.option.ConnectionUserName hive javax.jdo.option.ConnectionPassword hive


创建tmp文件 mkdir /home/hadoop/hive-2.3.0/tmp


并在hive-site.xml中修改:
使用vi hive-site.xml,并使用搜索 / 功能
有好几个{system:java.io.tmpdir},把所有的都修改 把{system:java.io.tmpdir} 改成 /home/hadoop/hive-2.3.0/temp/ 把 {system:user.name} 改成 {user.name}
8.初始化hive schematool -dbType mysql -initSchema hive hive

9.启动hive(两种方式)

1.直接输入:hive //直接进入,即可像操作mysql一样(除了delete,update)
2.使用beeline方式:
必须先对hadoop的core-site.xml进行配置,在文件里面添加并保存
hadoop.proxyuser.root.groups * hadoop.proxyuser.root.hosts *

如果没有配置,使用jdbc连接时,会报如下错误:
先启动hiveserver2
hiveserver2
查看hiveserver2启动状态

netstat -nptl | grep 10000
接着启动beeline

beeline
最后使用jdbc连接数据库

!connect jdbc:hive2://localhost:10000 hive hive

以上步骤即完成安装!

注意:
1.hive默认不支持delete,update操作
2.关于并发配置的修改,动态分区的配置都是默认状态
可根据需求,参考官网 hive ,修改hive-site.xml配置文件

本文参考: http://blog.csdn.net/lym152898/article/details/77334997 ,添加了细节部分!
大数据
2018-07-23 16:48:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
1、Mapping:
[译]ElasticSearch数据类型--string类型已死, 字符串数据永生
ElasticSearch动态日期映射
2、Spring Data Elasticsearch:
Spring Data Elasticsearch教程
大数据
2018-07-18 09:35:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
1.创建索引,准备数据
定义索引结构 DELETE telegraph PUT telegraph { "mappings": { "msg":{ "properties": { "title":{ "type": "text", "analyzer": "ik_max_word" }, "content":{ "type": "text", "analyzer": "ik_max_word" }, "author":{ "type": "text" }, "pubdate":{ "type": "date", "format": "date_hour_minute_second" } } } } }
批量加入测试数据 POST _bulk {"index":{"_index":"telegraph","_type":"msg"}} {"title":"宝泰隆:半年报预增140%-156%","content":"公司主要产品焦炭、甲醇销售量及销售价格较上年同期有较大的上涨","author":"宝泰隆","pubdate":"2018-07-17T17:16:30"} {"index":{"_index":"telegraph","_type":"msg"}} {"title":"周五召开董事会会议 审议及批准更新后的一季报","content":"以审议及批准更新后的2018年第一季度报告","author":"中兴通讯","pubdate":"2018-07-17T12:33:11"} {"index":{"_index":"telegraph","_type":"msg"}} {"title":"长生生物再次跌停 三机构抛售近1000万元","content":"长生生物再次一字跌停,报收19.89元,成交1432万元","author":"长生生物","pubdate":"2018-07-17T10:03:11"} {"index":{"_index":"telegraph","_type":"msg"}} {"title":"碧桂园集团副主席杨惠妍","content":"杨惠妍分别于7月10日、11日买入碧桂园1000万股、1500万股","author":"小财注","pubdate":"2018-07-17T16:12:55"} {"index":{"_index":"telegraph","_type":"msg"}} {"title":"河北聚焦十大行业推进国际产能合作","content":"河北省政府近日出台积极参与“一带一路”建设推进国际产能合作实施方案","author":"财联社","pubdate":"2018-07-17T14:14:55"}
2.term查询 GET telegraph/_search { "query": { "term": { "title": { "value": "主席" } } } }
查询结果 { "took": 5, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 1, "max_score": 0.2876821, "hits": [ { "_index": "telegraph", "_type": "msg", "_id": "A5etp2QBW8hrYY3zGJk7", "_score": 0.2876821, "_source": { "title": "碧桂园集团副主席杨惠妍", "content": "杨惠妍分别于7月10日、11日买入碧桂园1000万股、1500万股", "author": "小财注", "pubdate": "2018-07-17T16:12:55" } } ] } }
3.分页
from:起始行
size:返回条数 GET telegraph/_search { "from": 0, "size": 3, "query": { "match_all": {} } }
查询结果 { "took": 6, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 5, "max_score": 1, "hits": [ { "_index": "telegraph", "_type": "msg", "_id": "AZetp2QBW8hrYY3zGJk7", "_score": 1, "_source": { "title": "周五召开董事会会议 审议及批准更新后的一季报", "content": "以审议及批准更新后的2018年第一季度报告", "author": "中兴通讯", "pubdate": "2018-07-17T12:33:11" } }, { "_index": "telegraph", "_type": "msg", "_id": "A5etp2QBW8hrYY3zGJk7", "_score": 1, "_source": { "title": "碧桂园集团副主席杨惠妍", "content": "杨惠妍分别于7月10日、11日买入碧桂园1000万股、1500万股", "author": "小财注", "pubdate": "2018-07-17T16:12:55" } }, { "_index": "telegraph", "_type": "msg", "_id": "AJetp2QBW8hrYY3zGJk7", "_score": 1, "_source": { "title": "宝泰隆:半年报预增140%-156%", "content": "公司主要产品焦炭、甲醇销售量及销售价格较上年同期有较大的上涨", "author": "宝泰隆", "pubdate": "2018-07-17T17:16:30" } } ] } }
4.过滤字段
指定只需要返回的字段值 GET telegraph/_search { "_source": ["title","content"], "query": { "term": { "title": { "value": "主席" } } } }
查询结果 { "took": 13, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 1, "max_score": 0.2876821, "hits": [ { "_index": "telegraph", "_type": "msg", "_id": "A5etp2QBW8hrYY3zGJk7", "_score": 0.2876821, "_source": { "title": "碧桂园集团副主席杨惠妍", "content": "杨惠妍分别于7月10日、11日买入碧桂园1000万股、1500万股" } } ] } }
5.显示version
设置version字段为true,显示文档版本号 GET telegraph/_search { "_source": "title", "version": true, "query": { "term": { "title": { "value": "主席" } } } }
查询结果 { "took": 8, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 1, "max_score": 0.2876821, "hits": [ { "_index": "telegraph", "_type": "msg", "_id": "A5etp2QBW8hrYY3zGJk7", "_version": 1, "_score": 0.2876821, "_source": { "title": "碧桂园集团副主席杨惠妍" } } ] } }
6.评分过滤
过滤满足最小评分的文档 GET telegraph/_search { "min_score":"0.2", "query": { "term": { "title": { "value": "碧桂园" } } } }
查询结果 { "took": 5, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 1, "max_score": 0.2876821, "hits": [ { "_index": "telegraph", "_type": "msg", "_id": "A5etp2QBW8hrYY3zGJk7", "_score": 0.2876821, "_source": { "title": "碧桂园集团副主席杨惠妍", "content": "杨惠妍分别于7月10日、11日买入碧桂园1000万股、1500万股", "author": "小财注", "pubdate": "2018-07-17T16:12:55" } } ] } }
7.高亮关键字
设置属性,且该属性中有对应查询条件的关键字时高亮显示。 GET telegraph/_search { "query": { "term": { "title": { "value": "会议" } } }, "highlight": { "fields": { "title": {} } } }
查询结果 { "took": 9, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 1, "max_score": 0.2876821, "hits": [ { "_index": "telegraph", "_type": "msg", "_id": "AZetp2QBW8hrYY3zGJk7", "_score": 0.2876821, "_source": { "title": "周五召开董事会会议 审议及批准更新后的一季报", "content": "以审议及批准更新后的2018年第一季度报告", "author": "中兴通讯", "pubdate": "2018-07-17T12:33:11" }, "highlight": { "title": [ "周五召开董事会会议 审议及批准更新后的一季报" ] } } ] } }
大数据
2018-07-17 18:25:00