数据专栏

智能大数据搬运工,你想要的我们都有

科技资讯:

科技学院:

科技百科:

科技书籍:

网站大全:

软件大全:

「深度学习福利」大神带你进阶工程师,立即查看>>>
通用解决方案:
response=request.get("url网站")
data=bytes(response.text,response.encoding).decode("gbk","ignore")
大数据
2018-09-15 21:52:07
「深度学习福利」大神带你进阶工程师,立即查看>>> 官方文档 https://lightgbm.readthedocs.io/en/latest/Python-API.html http://lightgbm.apachecn.org/cn/latest/index.html 开源|LightGBM基本原理,以及调用形式 https://blog.csdn.net/HHTNAN/article/details/80068414 如何调参 https://blog.csdn.net/aliceyangxi1987/article/details/80711014
大数据
2018-09-15 17:33:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
Rook-Ceph 分布式存储系统快速部署
This guide will walk you through the basic setup of a Ceph cluster and enable you to consume block, object, and file storage from other pods running in your cluster. 官方文档, https://rook.io/docs/rook/v0.8/ceph-quickstart.html Rook 0.8 安装及Dashboard的问题和解决
版本要求
Rook支持的Kubernetes 要求版本高于 v1.7。
预先检查
确保 Kubernetes 集群可以用于 Rook , 跟着这个检查 follow these instructions .
如果使用 dataDirHostPath 来持久化 rook 到 kubernetes 的宿主机,确保 host 至少有 5GB 以上空间可用,而且是指定的目录下。
开始
如果幸运的话,Rook cluster 可以通过下面的kubectl commands来创建。更多细节,参考 deploy the Rook operator 。 cd cluster/examples/kubernetes/ceph kubectl create -f operator.yaml kubectl create -f cluster.yaml
集群运行后,就可以创建 block, object, or file 存储,从而可以被集群中其它的应用所使用。
部署Rook Operator
这是部署Rook system components,的第一步,包括一个Rook agent运行于每一个节点上,作为Rook operator pod,缺省部署在rook-ceph-system命名空间中。 cd cluster/examples/kubernetes/ceph kubectl create -f operator.yaml # verify the rook-ceph-operator, rook-ceph-agent, and rook-discover pods are in the `Running` state before proceeding kubectl -n rook-ceph-system get pod
该operator可以通过 Rook Helm Chart 来进行部署。
重启Kubelet, (K8S 1.7.x only)
对于Kubernetes 1.8之前的版本, Kubelet 进程在所有的节点上都要进行重启,在部署Rook operator 和 Rook agents之后。作为初始化 setup的一部分, Rook agents 部署和配置了Flexvolume plugin ,以此与 Kubernetes’ volume controller框架整合。在 Kubernetes v1.8+, 其 dynamic Flexvolume plugin discovery 将会发现和初始化Rook的plugin, 但在 Kubernetes的老版本上,手工重启Kubelet被要求。
创建Rook Cluster
现在Rook operator, agent,和 discover pods已经运行,我们可以创建 Rook cluster。为了重启之后还能复活集群,请确定设置了 dataDirHostPath 属性。更多的设置,参考 configuring the cluster .
保存下面的参数为 cluster.yaml : apiVersion: v1 kind: Namespace metadata: name: rook-ceph --- apiVersion: ceph.rook.io/v1beta1 kind: Cluster metadata: name: rook-ceph namespace: rook-ceph spec: dataDirHostPath: /var/lib/rook dashboard: enabled: true storage: useAllNodes: true useAllDevices: false config: databaseSizeMB: "1024" journalSizeMB: "1024"
创建cluster: kubectl create -f cluster.yaml
使用 kubectl 列出rook命名空间下的 pods 。其中,osd pods 的数量依赖于集群中节点的数量、配置的设备和目录的数量。 $ kubectl -n rook-ceph get pod NAME READY STATUS RESTARTS AGE rook-ceph-mgr-a-75cc4ccbf4-t8qtx 1/1 Running 0 24m rook-ceph-mon0-72vx7 1/1 Running 0 25m rook-ceph-mon1-rrpm6 1/1 Running 0 24m rook-ceph-mon2-zff9r 1/1 Running 0 24m rook-ceph-osd-id-0-5fd8cb9747-dvlsb 1/1 Running 0 23m rook-ceph-osd-id-1-84dc695b48-r5mhf 1/1 Running 0 23m rook-ceph-osd-id-2-558878cd84-cnp67 1/1 Running 0 23m rook-ceph-osd-prepare-minikube-wq4f5 0/1 Completed 0 24m
Storage
了解Rook提供的三种不同存储类型,查看指南: Block : 创建块存储供pod使用,相当于创建一块硬盘,然后可以被pod挂载到操作系统。 Object : 创建object store可以供 Kubernetes cluster 内部或外部使用,是一种类似于Amazon S3的网络存储服务。 Shared File System : 创建一个文件系统,可以在多个 pods 间共享。
Ceph Dashboard,面板
Ceph 有一个dashboard服务,依赖于mgr的指标收集。可以查看集群的状态,参见 dashboard guide 。也可以通过安装集成的Prometheus和Grafana来查看运行状态。 Dashboard目前(0.8)还会遇到一些问题,参见 Rook 0.8 安装及Dashboard的问题和解决
Tools,客户端工具
我们创建了一个toolbox container,里面包含全功能的Ceph clients命令行工具,可以用于调试和诊断Rook集群的状态和错误。 查看 toolbox readme 获得安装和用法的信息。同时,查看 advanced configuration 文档查看维护和调优的例子。
Monitoring,状态监测
每一个 Rook cluster 都有内置的 metrics collectors/exporters用于 Prometheus 的监测。了解如何设置 monitoring,请查看 monitoring guide 。
结束
测试完集群后,查看 these instructions 可以将其清除。
大数据
2018-09-15 11:44:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
一、Spark运行环境
   Spark是Scala写的,运行在JVM上,所以运行环境Java7+,如果使用Python API,需要安装Python 2.6+或者Python3.4+,安装之前确认已经安装了Java,可参见本人博客: http://blog.csdn.net/lengconglin/article/details/77016911
  这里注意版本对应问题,2.0以上都对应Scala2.11 Spark1.6.2-Scala 2.10 Spark 2.0.0 -Scala 2.11
二、下载Spark
  下载地址为: http://spark.apache.org/downloads.html
  
   搭建Spark不需要Hadoop,如有hadoop集群,可下载相应的版本
   下载之后解压 cd ~/Downloads/ tar xzvf spark-2.2.0-bin-hadoop2.7.tgz mv spark-2.2.0-bin-hadoop2.7/ spark sudo mv spark/ /usr/lib/
三、配置Spark和.bashrc cd /usr/lib/spark/conf/ cp spark-env.sh.template spark-env.sh gedit spark-env.sh
添加如下两行:
JAVA_HOME=/usr/lib/jdk/jdk1.8.0_144
SPARK_WORKER_MEMORY=4g sudo gedit ~/.bashrc export JAVA_HOME=/usr/lib/jdk/jdk1.8.0_144 export SPARK_HOME=/usr/lib/spark export PATH=$PATH:$JAVA_HOME/bin export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
可以打开终端进行测试,输入spark-shell:
四、IntelliJ IDEA 开发环境搭建
首先下载intellij idea,下载地址为: https://www.jetbrains.com/idea/
加压之后进入bin目录,打开终端运行 ./idea.sh 本人下载的是完全版,所以需要输入注册码:
1. 到网站 http://idea.lanyus.com/ 获取注册码。
2.填入下面的license server:
   http://intellij.mandroid.cn/
   http://idea.imsxm.com/
   http://idea.iteblog.com/key.php
以上方法验证均可以。
可以在安装的时候就选择安装Scala插件,也可以之后在插件里面搜索安装Scala,具体如下:
File -> Settings -> Plugins 搜素Scala,找到插件安装,安装完成之后重启IDE
测试:File ->New -> Project… 弹出的对话框选择Scala -> SBT 之后设置名字,选择JDK,SBT和Scala版本:
点击Finish,然后耐心等待一段时间,后台需要下载相应依赖和代码,所以要保证网络环境良好。

PS:Ubuntu下使用pip安装有BUG存在,修复了BUG依然会因为网络错误安装失败
大数据
2018-09-15 02:17:00
「深度学习福利」大神带你进阶工程师,立即查看>>> conn = pymysql.connect( host = "localhost" , user = "root" , passwd = "123456" , db = "dd" )
python连接mysql数据库问题:
我的代码运行卡在这条代码就执行不下去了!
这条代码本身并没有问题,但是所引用的connection.py包中要设置一下charset的值“utf8”(或者“utf-8”,可能版本不一样会有所不同)
在C:
大数据
2018-09-14 21:17:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
一、常用操作
1. 查看数据表列表 show tables [like '*name*'];
2. 创建表 内部表和外部表区别:内部表即完全交给hive管理表,在创建时会将数据移动到数据仓库所 在的路径,删除时会删除数据源文件。外部表即增加hive管理的数据文件,创建时需要记录 数据所在的路径,不会移动数据源文件,删除时不会删除数据源文件 创建内部表 create table {tableName}( {columnName} {columnType}, {columnName} {columnType} ) [row format delimited fields terminated by '\t']; 创建外部表 create external table {tableName}( {columnName} {columnType}, {columnName} {columnType} ) [row format delimited fields terminated by '\t' location '{HDFS_path}']; 创建分区表 create [external] table {tableName}( {columnName} {columnType}, {columnName} {columnType} ) partitioned by( {columnName} {columnType}, {columnName} {columnType} ) [row format delimited fields terminated by '\t' location '{HDFS_path}']; 默认分隔符为'\001'
3. 查看表信息 查看表结构 desc {tableName}; 或 {desc formatted}/{describe} {tableName}; 查看分区信息 show partitions {tableName};
4. 数据导入 文件导入 // 从hdfs中导入,是否覆盖 load data inpath '{HDFSPath}' [overwrite] into table {tableName}; // 从linux本地导入,是否覆盖 load data local inpath '{localPath}' [overwrite] into table {tableName}; hive数据查询导入 // insert into与sql语法一致 insert [overwrite] into table {dstTableName} select {needColumns} from {srcTableName};
5. 查看可用函数 // 可用函数列表 show functions; // 查看函数描述信息 desc function {functionName};
二、常用函数
1. 取整函数 四舍五入:round({value}[,precision]) 1~4:舍,5~9:进,传入两个参数时可指定精度 银行家舍入法:bround({value}[,precision]) 1~4:舍,6~9:进,5->前一位是偶数:舍,前一位是奇数:进,传入两个参数时可指定精 度
2. 向下取整函数 floor({value})
3. 向上取整函数 ceil/ceiling({value})
4. 生成随机数 rand([{seed}]):返回一个0到1范围内的随机数,传入参数时可生成稳定的随机数
5. 自然指数函数 自然指数e的n次方:exp({n})
6. 对数函数 以10为底的对数函数:log10({value}) 以2为底的对数函数:log2({value}) 以e为底的对数函数:ln({value}) 对数函数:log({base},{value})
7. 获取日期函数 unix_timestamp() //获取的是时间戳
8. 时间戳转换函数 UNIX时间戳转日期:from_unixtime({unixTime}[,{formatString}]) 日期转UNIX时间戳:unix_timestamp({timeString}[,{formatString}])
9. 日期截取函数 返回日期部分:to_date({timeString}) 返回日期的年:year({timeString}) 返回日期的月:month({timeString}) 返回日期的天:day({timeString}) 返回日期的时:hour({timeString}) 返回日期的分:minute({timeString}) 返回日期的秒:second({timeString}) 返回日期的周:weekofyear({timeString})
10. 日期计算函数 日期比较函数:datediff({endDate},{startDate}) 日期增加函数:date_add({startData},{days}) 日期减少函数:date_sub({startData},{days})
11. 字符串函数 字符串长度函数:length({stringValue}) 字符串反转函数:reverse({stringValue}) 字符串连接函数 无分隔符连接函数:concat({stringValues}) 分隔符连接函数:concat_ws({separator},{stringValues})
12. 字符串截取函数 substr/substring({stringValue},{index}):当index为正数时,截取从index至结尾的字符 串,当index为负数时,截取后index个字符,index的值不能超过字符串长度 substr/substring({stringValue},{index},{length}):截取从index开始,长度为length的字 符,index为正数时,索引从左边开始,index为负数时,索引从右边开始
13. 大小写转换函数 转大写函数:upper/ucase ({stringValue}) 转小写函数:lower/lcase({stringValue})
14. 去空格函数 两边去空格函数:trim({stringValue}) 左边去空格函数:ltrim({stringValue}) 右边去空格函数:rtrim({stringValue})
大数据
2018-09-14 20:27:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
一、编写系定义函数 以jsonParse(json数据分析)为例 依赖jar包:hive-exec-*.jar 继承UDF 方法名:evaluate import org.apache.hadoop.hive.ql.exec.UDF; import net.sf.json.JSONObject; public class JsonParse extends UDF { public String evaluate(String jsonStr,String key) { return JSONObject.fromObject(jsonStr).get(key).toString(); } }
二、加载自定义函数
1. 导出jar包 只需要勾选src下的内容
2. 传输到Linux系统中去 导出的jar包导入到指定目录 导出jar包的依赖包(在hive-bin/lib下不存在的)需要导入到lib包下
3. 加载jar包 在hive中操作 add jar {path} //path指自己封装的jar包
4. 创建函数(函数只在当前会话下生效,关闭会话即失效) 在hive中操作 create temporary function {functionName} as {classPath}
5. 删除函数 drop temporary function {functionName}
6. 配置文件加载(一般写启动脚本,就不必配置此项) hive-site.xml中配置,可以省略导入jar包操作 hive.aux.jars.path $HIVE_HOME/auxlib
7. 初始化文件加载(执行此步操作就不必执行第6步) 启动hive时指定初始化文件,在文件中添加jar包,创建函数 vi init-hive // 脚本内容 add jar /home/bigdata/UDF.jar; create temporary function myfunc as "com.qfedu.edu.FirstUDF"; // 进入hive界面操作时执行上述脚本 hive -i init-hive
8. HiveJDBC加载方式 在执行操作之前完成函数的初始化操作,生产环境中通常会封装为工具类 ep: public class HiveUtil { static { try { // 1.加载驱动 Class.forName("org.apache.hive.jdbc.HiveDriver"); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public HiveUtil() { open(); Init(); } private void open() { try { // 2.打开连接 Connection connection = DriverManager.getConnection("jdbc:hive2://HADOOP01:10010/"); // 3.获得操作对象 - 会话 statement = connection.createStatement(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void Init() { try { statement.execute("add jar /home/bigdata/udf.jar"); statement.execute("create temporary function sub as 'com.yulang.udf.SubString'"); statement.execute("create temporary function jsonParse as 'com.yulang.udf.JsonParse'"); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
大数据
2018-09-14 20:56:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
一、HWI配置
1. 解压缩源码包 tar -zxvf apache-hive-1.2.2-src.tar.gz 链接: https://pan.baidu.com/s/1J-fbovi8ZprzD_5E9MJhuQ 密码:2nnt
2. 打包应用 cd /home/bigdata/apache-hive-1.2.2-src/hwi jar cvfM hive-hwi-1.2.2.war -C web . // 将web打为JAR包 cp hive-hwi-1.2.2.war $HIVE_HOME/lib/ //拷贝到hive下的lib包下
3. 修改配置文件hive-site.xml cd /home/bigdata/apache-hive-1.2.2-bin/conf/hive-site.xml // 增加如下配置信息 hive.hwi.war.file lib/hive-hwi-1.2.2.war
4. HWI依赖jar包(lib目录) jasper-runtime-*.jar jasper-compiler-*.jar commons-el-1.0.jar tools.jar 链接: https://pan.baidu.com/s/1JfMXdrKwSyph2XYMCoW2nA 密码:7hh2
5. 启动命令 hive --service hwi
二、HWI的使用 浏览数据源 创建操作会话 查询数据生成结果 Result File路径为本地路径(提前创建) Start Query:Yes
三、后台运行 // 步骤一中的启动只在当前会话生效,会话关闭即失效 nohup hive --service hwi > /dev/null 2>&1 &
大数据
2018-09-14 19:24:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
记得刚接触到hadoop的时候跟大部分人一样都会抱怨hadoop的安装部署问题,对于一个新手来说这这的是个头疼的问题,可能需要花费一整天的时间才能把分布式环境安装配置好。在刚接触hadoop的一段时间里,可以说对于hadoop的理解一直都是停留在相对较肤浅的层面。后来随着自己的不断摸索以及向圈内的前辈大神请教交流(主要是向大神请教学来的),自己对于hadoop的认识以及应用也就更加娴熟。
作为一个过来人,在这里给新人分享一些关于hadoop版本选择的问题,希望别像我当时傻乎乎的只知道hadoop有1.0.x和2.x版本。
当前hadoop的发行版本除了Apache的开元版本之外,华为发行版、Intel发行版以及Cloudera发行版等。上面说的这几个第三方的发行版已经有相对较长的一些时间,除此之外还有最近几年异军突起的DKhadoop商业发行版。
国内的大多数公司推出的Hadoop发行版都是收费的,免费的发行版则主要是国外的,比如Apache的发行版、Cloudera发行版等。面对如此多的hadoop版本不免会让人难以选择。下面我们就简单对比一些这些不同版本的优缺点,希望对于新手能够有所帮助。
Apache发行版:
优点: Apache发行版的优点主要集中表现在它的完全开源免费、社区活跃性以及文档、资料详实等方面。
缺点: Apache发行版的缺点也相对较多,具体表现在以下几个方面: 复杂的版本管理。版本管理比较混乱,各种版本层出不穷,让使用者不知所措。 复杂的集群部署、安装、配置。通常按照集群需要编写大量的配置文件,分发到每一台节点上,容易出错,效率低下。 复杂的集群运维。对集群的监控,运维,需要安装第三方的其他软件,如ganglia,nagois等,运维难度较大。 复杂的生态环境。在Hadoop生态圈中,组件的选择、使用,比如Hive,Mahout,Sqoop,Flume,Spark,Oozie等等,需要大量考虑兼容性的问题,版本是否兼容,组件是否有冲突,编译是否能通过等。经常会浪费大量的时间去编译组件,解决版本冲突问题。
第三方发行版本的优缺点:( 如CDH,HDP,MapR等 )
优点: 第三方发行版本的优点主要有以下几个: 基于Apache协议,100%开源; 相较于原生的hadoop在兼容性、安全型以及稳定性方面有所提升; 版本管理清晰,更新更快; 提供了部署、安装、配置工具,大大提高了集群部署的效率,可以在几个小时内部署好集群; 运维简单。提供了管理、监控、诊断、配置修改的工具,管理配置方便,定位问题快速、准确,使运维工作简单,有效。
缺点: 第三方hadoop发行版的主要缺点就是涉及到厂商锁定的问题,但这一问题可以通过技术方面解决掉。
DKhadoop发行版:
Dkhadoop发行版是我目前接触的以及使用一个版本。与市场的其他第三方的发行版本相比较,整合程度要更高,但同样也保持了开源系统的全部优点。综合目前使用的情况看,在性能上相较于以往使用的一些第三方hadoop发行版要提升很多的!关于DKhadoop发行版的问题,感兴趣的可以自己去查阅收集一些资料看看。
大数据
2018-09-14 16:23:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
1. 概要
前面介绍了如何把Drill部署在YARN上,然后通过Drill-on-YARN客户端,你可以启动、停止、调整、清零命令操作Drill。但是在这么命令背后,到底是如何执行的呢,下面会对Drill-on-YARN的源码进行详细的解析,重点解析启动过程,其他命令简单介绍。
说明:下面涉及到的代码,以drill 1.14.0为准,并且为了减少篇幅,进行了删减。
2. Drill-on-YARN start
2.1 drill-on-yarn.sh
通过查看drill-on-yarn.sh脚本,很容易发现最终执行的java类是 CLIENT_CMD="$JAVA $VM_OPTS -cp $CP org.apache.drill.yarn.client.DrillOnYarn ${args[@]}" 。 org.apache.drill.yarn.client.DrillOnYarn 便是启动Drill-on-YARN的入口。我们可以总览一下这个类: public class DrillOnYarn { public static void main(String argv[]) { BasicConfigurator.configure(); ClientContext.init(); run(argv); } public static void run(String argv[]) { ClientContext context = ClientContext.instance(); CommandLineOptions opts = new CommandLineOptions(); if (!opts.parse(argv)) { opts.usage(); context.exit(-1); } if (opts.getCommand() == null) { opts.usage(); context.exit(-1); } try { DrillOnYarnConfig.load().setClientPaths(); } catch (DoyConfigException e) { ClientContext.err.println(e.getMessage()); context.exit(-1); } ClientCommand cmd; switch (opts.getCommand()) { case UPLOAD: cmd = new StartCommand(true, false); break; case START: cmd = new StartCommand(true, true); break; case DESCRIBE: cmd = new PrintConfigCommand(); break; case STATUS: cmd = new StatusCommand(); break; case STOP: cmd = new StopCommand(); break; case CLEAN: cmd = new CleanCommand(); break; case RESIZE: cmd = new ResizeCommand(); break; default: cmd = new HelpCommand(); } cmd.setOpts(opts); try { cmd.run(); } catch (ClientException e) { displayError(opts, e); context.exit(1); } } }
可以看到入口main方法,其中最关键的便是run方法,包含了很多的命令,我们重点看start命令,代码如下: public void run() throws ClientException { checkExistingApp(); dryRun = opts.dryRun; config = DrillOnYarnConfig.config(); FileUploader uploader = upload(); if (launch) { launch(uploader); } }
概括的来说,它主要包含以下流程: 检查application是否已经存在,如果已经存在,便不允许启动,否则执行启动操作(此处检查的application是YARN的application,启动成功会将YARN的applicationId写入本地磁盘的一个文件,通过此文件来检查)。 上传Drill二方包和site目录下的内容至DFS上,其中site目录下的内容会被打包为site.tar.gz public void run() throws ClientException { setup(); uploadDrillArchive(); if (hasSiteDir()) { uploadSite(); } } 启动ApplicationMaster,主要流程为: 创建YARN客户端,并启动 // AMRunner#connectToYarn private void connectToYarn() { System.out.print("Loading YARN Config..."); client = new YarnRMClient(); System.out.println(" Loaded."); } 创建ApplicationMaster // AMRunner#createApp private void createApp() throws ClientException { try { appResponse = client.createAppMaster(); } catch (YarnClientException e) { throw new ClientException("Failed to allocate Drill application master", e); } appId = appResponse.getApplicationId(); System.out.println("Application ID: " + appId.toString()); } 设置ApplicationMaster上下文,包括:Heap memory、Class Path、启动的命令( dirll-am.sh )、启动am容器使用的资源(memory、vCores、disks) 校验资源,主要是ApplicationMaster使用资源是否超过了YARN的设置 提交ApplicationMaster private void launchApp(AppSpec master) throws ClientException { try { client.submitAppMaster(master); } catch (YarnClientException e) { throw new ClientException("Failed to start Drill application master", e); } } 等待启动,并打印启动日志 将ApplicationMaster的appid写入文件(在第1步,检测Application是否存在,就是使用这个文件)
ApplicationMaster启动后,会向RM申请资源,启动Drillbits,下面详细介绍ApplicationMaster启动后的操作
2.2 drill-am.sh
通过查看drill-am.sh脚本,很容易发现最终执行的java类是 AMCMD="$JAVA $AM_JAVA_OPTS ${args[@]} -cp $CP org.apache.drill.yarn.appMaster.DrillApplicationMaster" 。 org.apache.drill.yarn.appMaster.DrillApplicationMaste 表示ApplicationMaster执行的入口,下面总览一下这个类: public class DrillApplicationMaster { public static void main(String[] args) { LOG.trace("Drill Application Master starting."); try { DrillOnYarnConfig.load().setAmDrillHome(); } catch (DoyConfigException e) { System.err.println(e.getMessage()); System.exit(-1); } Dispatcher dispatcher; try { dispatcher = (new DrillControllerFactory()).build(); } catch (ControllerFactoryException e) { LOG.error("Setup failed, exiting: " + e.getMessage(), e); System.exit(-1); return; } try { if (!dispatcher.start()) { return; } } catch (Throwable e) { LOG.error("Fatal error, exiting: " + e.getMessage(), e); System.exit(-1); } WebServer webServer = new WebServer(dispatcher); try { webServer.start(); } catch (Exception e) { LOG.error("Web server setup failed, exiting: " + e.getMessage(), e); System.exit(-1); } try { dispatcher.run(); } catch (Throwable e) { LOG.error("Fatal error, exiting: " + e.getMessage(), e); System.exit(-1); } finally { try { webServer.close(); } catch (Exception e) { } } } }
概况的来说,它主要包含以下流程: 加载Drill-on-YARN的配置,并设置AM的DirllHome,比如 /home/admin/tmp2/hadoop/nm-local-dir/usercache/admin/appcache/application_1534698866098_0022/container_1534698866098_0022_01_000001/drill/apache-drill-1.14.0 构造Dispatcher,Dispatcher用于分配YARN、timer、ZooKeeper事件给给集群控制器,它是轻量级多线程的,用于响应RM、NM、timer线程的事件,对于某一个事件,它是连续的,所以需要同步,但是不同类型的事件不需要同步。整个的构造流程如下: 准备资源,包括:drill二方包、site压缩包的目录 private Map prepareResources() { ... drillArchivePath = drillConfig.getDrillArchiveDfsPath(); siteArchivePath = drillConfig.getSiteArchiveDfsPath(); ... } 定义任务启动的规格(TaskSpec),包括:运行时环境、YARN container的规格、dirllbit的规格 private TaskSpec buildDrillTaskSpec(Map resources) throws DoyConfigException { ... ContainerRequestSpec containerSpec = new ContainerRequestSpec(); containerSpec.memoryMb = config.getInt(DrillOnYarnConfig.DRILLBIT_MEMORY); ... LaunchSpec drillbitSpec = new LaunchSpec(); ... TaskSpec taskSpec = new TaskSpec(); taskSpec.name = "Drillbit"; taskSpec.containerSpec = containerSpec; taskSpec.launchSpec = drillbitSpec; } 设置Dispatcher的控制器:实现类为ClusterControllerImpl,它主要通过状态来控制Drill集群、调整整个集群的任务(Drill启动、停止等任务)、处理container的回调 public void setYarn(AMYarnFacade yarn) throws YarnFacadeException { this.yarn = yarn; controller = new ClusterControllerImpl(yarn); } 为控制器注册Scheduler,比如DrillbitScheduler,此外Scheduler配置来源于之前drill-on-yarn.conf cluster: [ { name: "drill-group1" type: "basic" count: 1 } ] ... ClusterDef.ClusterGroup pool = ClusterDef.getCluster(config, 0); Scheduler testGroup = new DrillbitScheduler(pool.getName(), taskSpec, pool.getCount(), requestTimeoutSecs, maxExtraNodes); dispatcher.getController().registerScheduler(testGroup); ... 创建ZooKeeper集群协调器 String zkConnect = config.getString(DrillOnYarnConfig.ZK_CONNECT); String zkRoot = config.getString(DrillOnYarnConfig.ZK_ROOT); String clusterId = config.getString(DrillOnYarnConfig.CLUSTER_ID); 启动Dispatcher,主要启动AMRMClientAsync、NMClientAsync、YarnClient ... yarn.start(new ResourceCallback(), new NodeCallback()); String url = trackingUrl.replace("", Integer.toString(httpPort)); if (DrillOnYarnConfig.config().getBoolean(DrillOnYarnConfig.HTTP_ENABLE_SSL)) { url = url.replace("http:", "https:"); } yarn.register(url); controller.started(); ... ... resourceMgr = AMRMClientAsync.createAMRMClientAsync(pollPeriodMs, resourceCallback); resourceMgr.init(conf); resourceMgr.start(); ... nodeMgr = NMClientAsync.createNMClientAsync(nodeCallback); nodeMgr.init(conf); nodeMgr.start(); ... client = YarnClient.createYarnClient(); client.init(conf); client.start(); ... 启动dirll运维界面 WebServer webServer = new WebServer(dispatcher); webServer.start(); 运行Dispatcher,主要是启动一个线程,此线程会不断的轮询当前的任务队列中的任务情况,比如启动、停止、resize等类型的任务,然后执行相应的动作,拿启动来说 添加一个启动任务,然后放入pendingTask队列中 if (state == State.LIVE) { adjustTasks(curTime); requestContainers(); } 向RM请求container:创建一个ContainerRequest ContainerRequest request = containerSpec.makeRequest(); resourceMgr.addContainerRequest(containerSpec.makeRequest()); return request; ResourceCallback监听container分配,然后启动container private class ResourceCallback implements AMRMClientAsync.CallbackHandler { @Override public void onContainersAllocated(List containers) { controller.containersAllocated(containers); } } public void containerAllocated(EventContext context, Container container) { Task task = context.task; LOG.info(task.getLabel() + " - Received container: " + DoYUtil.describeContainer(container)); context.group.dequeueAllocatingTask(task); // No matter what happens below, we don't want to ask for this // container again. The RM async API is a bit bizarre in this // regard: it will keep asking for container over and over until // we tell it to stop. context.yarn.removeContainerRequest(task.containerRequest); // The container is need both in the normal and in the cancellation // path, so set it here. task.container = container; if (task.cancelled) { context.yarn.releaseContainer(container); taskStartFailed(context, Disposition.CANCELLED); return; } task.error = null; task.completionStatus = null; transition(context, LAUNCHING); // The pool that manages this task wants to know that we have // a container. The task manager may want to do some task- // specific setup. context.group.containerAllocated(context.task); context.getTaskManager().allocated(context); // Go ahead and launch a task in the container using the launch // specification provided by the task group (pool). try { context.yarn.launchContainer(container, task.getLaunchSpec()); task.launchTime = System.currentTimeMillis(); } catch (YarnFacadeException e) { LOG.error("Container launch failed: " + task.getContainerId(), e); // This may not be the right response. RM may still think // we have the container if the above is a local failure. task.error = e; context.group.containerReleased(task); task.container = null; taskStartFailed(context, Disposition.LAUNCH_FAILED); } } NodeCallback监听container启动 public class NodeCallback implements NMClientAsync.CallbackHandler { @Override public void onStartContainerError(ContainerId containerId, Throwable t) { controller.taskStartFailed(containerId, t); } @Override public void onContainerStarted(ContainerId containerId, Map allServiceResponse) { controller.containerStarted(containerId); } @Override public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) { } @Override public void onGetContainerStatusError(ContainerId containerId, Throwable t) { } @Override public void onStopContainerError(ContainerId containerId, Throwable t) { controller.stopTaskFailed(containerId, t); } @Override public void onContainerStopped(ContainerId containerId) { controller.containerStopped(containerId); } }
2.3 fail over
Drill-on-YARN除了提供start、stop、resize功能外,还提供了fail over功能,当前某个drillbit挂掉后,Drill-on-YARN会尝试再次启动drillbit,目前重试的次数为2。此外,如果一个drillbit所在的节点频繁挂掉,会被列入黑名单。
我们可以通过手动kill drillbit来模拟drillbit挂掉的情况,然后等待一会儿,可以看到,drillbit进程重新启动了。下面我们看看,代码的执行流程 drillbit挂掉,container结束 private class ResourceCallback implements AMRMClientAsync.CallbackHandler { @Override public void onContainersCompleted(List statuses) { controller.containersCompleted(statuses); } } retry task:重新将这个task加入pendingTasks,然后轮询的线程检测到pendingTasks不为空,执行启动操作 protected void taskTerminated(EventContext context) { Task task = context.task; context.getTaskManager().completed(context); context.group.containerReleased(task); assert task.completionStatus != null; // container结束的状态不是0,说明不是正常结束 if (task.completionStatus.getExitStatus() == 0) { taskEnded(context, Disposition.COMPLETED); context.group.taskEnded(context.task); } else { taskEnded(context, Disposition.RUN_FAILED); retryTask(context); } } private void retryTask(EventContext context) { Task task = context.task; assert task.state == END; if (!context.controller.isLive() || !task.retryable()) { context.group.taskEnded(task); return; } if (task.tryCount > task.taskGroup.getMaxRetries()) { LOG.error(task.getLabel() + " - Too many retries: " + task.tryCount); task.disposition = Disposition.TOO_MANY_RETRIES; context.group.taskEnded(task); return; } LOG.info(task.getLabel() + " - Retrying task, try " + task.tryCount); context.group.taskRetried(task); task.reset(); transition(context, START); context.group.enqueuePendingRequest(task); }
3. 停止
除了前面详情介绍的start命令外,Drill-on-YARN也提供了stop命令,其中stop分两种: 强制停止:直接调用yarn客户端的killApplication api yarnClient.killApplication(appId); 优雅停止:先清理所有的任务,包括pending、running的,然后调用yarn的api杀死容器,关闭controller,然后通知am运行结束 ... for (Task task : getStartingTasks()) { context.setTask(task); context.getState().cancel(context); } for (Task task : getActiveTasks()) { context.setTask(task); context.getState().cancel(context); } ... ... context.yarn.killContainer(task.container); ... public void run() throws YarnFacadeException { ... boolean success = controller.waitForCompletion(); ... ... finish(success, null); ... } public boolean waitForCompletion() { start(); synchronized (completionMutex) { try { completionMutex.wait(); } catch (InterruptedException e) { } } return succeeded(); } public void finish(boolean succeeded, String msg) throws YarnFacadeException { nodeMgr.stop(); String appMsg = "Drill Cluster Shut-Down"; FinalApplicationStatus status = FinalApplicationStatus.SUCCEEDED; if (!succeeded) { appMsg = "Drill Cluster Fatal Error - check logs"; status = FinalApplicationStatus.FAILED; } if (msg != null) { appMsg = msg; } try { resourceMgr.unregisterApplicationMaster(status, appMsg, ""); } catch (YarnException | IOException e) { throw new YarnFacadeException("Deregister AM failed", e); } resourceMgr.stop(); }
4. resize
resize流程为:调整quantity(保留多少个container),之后轮询线程会根据quantity,调整任务,执行resize操作 public int resize(int level) { int limit = quantity + state.getController().getFreeNodeCount() +maxExtraNodes; return super.resize( Math.min( limit, level ) ); }
5. 总结
总的来说,Drill-on-YARN分为两大模块,drill-on-yarn.sh和drill-am.sh。drill-on-yarn.sh用于启动ApplicationMaster,drill-am.sh用于向ResourceManager申请资源并启动Drill集群。其中Drill的启动、停止、缩容、扩容,都被封装为一个任务,在执行这些命令时,会构建一个任务,放入任务队列中。有一个线程会一直轮询此队列,根据队列中的任务执行不同的操作,从而达到启动、停止、缩容、扩容Drill集群的功能。此外,相比独立部署,Drill-on-YARN提供的failover功能强化了Drill的稳定性。
大数据
2018-09-14 15:40:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
本系列共两篇文章,会探讨如何将Ignite和Spark进行集成。
Ignite是一个分布式的内存数据库、缓存和处理平台,为事务型、分析型和流式负载而设计,在保证扩展性的前提下提供了内存级的性能。
Spark是一个流式数据和计算引擎,通常从HDFS或者其他存储中获取数据,一直以来,他都倾向于OLAP型业务,并且聚焦于MapReduce类型负载。
因此,这两种技术是可以互补的。
将Ignite与Spark整合
整合这两种技术会为Spark用户带来若干明显的好处: 通过避免大量的数据移动,获得真正可扩展的内存级性能; 提高RDD、DataFrame和SQL的性能; 在Spark作业之间更方便地共享状态和数据。
下图中显示了如何整合这两种技术,并且标注了显著的优势: 在本系列的第一篇文章中会聚焦于Ignite RDD,在第二篇文章中会聚焦于Ignite DataFrame。
Ignite RDD
Ignite提供了一个SparkRDD的实现,叫做IgniteRDD,这个实现可以在内存中跨Spark作业共享任何数据和状态,IgniteRDD为Ignite中相同的内存数据提供了一个共享的、可变的视图,它可以跨多个不同的Spark作业、工作节点或者应用,相反,原生的SparkRDD无法在Spark作业或者应用之间进行共享。
IgniteRDD作为Ignite分布式缓存的视图,既可以在Spark作业执行进程中部署,也可以在Spark工作节点中部署,也可以在它自己的集群中部署。因此,根据预配置的部署模型,状态共享既可以只存在于一个Spark应用的生命周期的内部(嵌入式模式),或者也可以存在于Spark应用的外部(独立模式)。
Ignite还可以帮助Spark用户提高SQL的性能,虽然SparkSQL支持丰富的SQL语法,但是它没有实现索引。从结果上来说,即使在普通的较小的数据集上,Spark查询也可能花费几分钟的时间,因为需要进行全表扫描。如果使用Ignite,Spark用户可以配置主索引和二级索引,这样可以带来上千倍的性能提升。
IgniteRDD示例
下面通过一些代码以及创建若干应用的方式,演示如何使用IgniteRDD以及看到它的好处,代码的完整版本,可以从 GitHub 上进行下载。
代码共包括两个简单的Scala应用和两个Java应用。这是为了说明可以使用多种语言来访问Ignite RDD,这在使用不同编程语言和框架的组织中可能存在这样的场景。此外,会从两个不同的环境运行应用:从终端运行Scala应用以及通过IDE运行Java应用。作为一个花絮,还会在Java应用程序中运行一些SQL代码。
对于Scala应用,一个应用会用于往IgniteRDD中写入部分数据,而另一个应用会执行部分过滤然后结果集。使用Maven将代码构建为一个jar文件后在终端窗口中执行这个程序,下面是详细的代码: object RDDWriter extends App { val conf = new SparkConf().setAppName("RDDWriter") val sc = new SparkContext(conf) val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml") val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD") sharedRDD.savePairs(sc.parallelize(1 to 1000, 10).map(i => (i, i))) ic.close(true) sc.stop() } object RDDReader extends App { val conf = new SparkConf().setAppName("RDDReader") val sc = new SparkContext(conf) val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml") val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD") val greaterThanFiveHundred = sharedRDD.filter(_._2 > 500) println("The count is " + greaterThanFiveHundred.count()) ic.close(true) sc.stop() }
在这个Scala的RDDWriter中,首先创建了包含应用名的 SparkConf ,之后基于这个配置创建了 SparkContext ,最后,根据这个 SparkContext 创建一个 IgniteContext 。创建 IgniteContext 有很多种方法,本例中会使用一个叫做 example-shared-rdd.xml 的XML文件,该文件会结合Ignite发行版然后根据需求进行了预配置。显然,需要根据自己的环境修改路径(Ignite主目录),之后指定IgniteRDD持有的整数值元组,最后,将从1到1000的整数值存入IgniteRDD,数值的存储使用了10个parallel操作。
在这个Scala的RDDReader中,初始化和配置与Scala RDDWriter相同,也会使用同一个xml配置文件,应用会执行部分过滤,然后关注存储了多少大于500的值,答案最后会输出出来。
关于 IgniteContext 和 IgniteRDD 的更多信息,可以看Ignite的 文档 。
要构建jar文件,可以使用下面的maven命令: mvn clean install
接下来,看下Java代码,先写一个Java应用往IgniteRDD中写入多个元组,然后另一个应用会执行部分过滤然后返回结果集,下面是RDDWriter的代码细节: public class RDDWriter { public static void main(String args[]) { SparkConf sparkConf = new SparkConf() .setAppName("RDDWriter") .setMaster("local") .set("spark.executor.instances", "2"); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); Logger.getRootLogger().setLevel(Level.OFF); Logger.getLogger("org.apache.ignite").setLevel(Level.OFF); JavaIgniteContext igniteContext = new JavaIgniteContext( sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true); JavaIgniteRDD sharedRDD = igniteContext.fromCache("sharedRDD"); List data = new ArrayList<>(20); for (int i = 1001; i <= 1020; i++) { data.add(i); } JavaRDD javaRDD = sparkContext.parallelize(data); sharedRDD.savePairs(javaRDD.mapToPair(new PairFunction() { public Tuple2 call(Integer val) throws Exception { return new Tuple2(val, val); } })); igniteContext.close(true); sparkContext.close(); } }
在这个Java的RDDWriter中,首先创建了包含应用名和执行器数量的 SparkConf ,之后基于这个配置创建了 SparkContext ,最后,根据这个 SparkContext 创建一个 IgniteContext 。创建 IgniteContext 有很多种方法,本例中会使用一个叫做 example-shared-rdd.xml 的XML文件,该文件会结合Ignite发行版然后根据需求进行了预配置。显然,需要根据自己的环境修改路径(Ignite主目录),最后,往IgniteRDD中添加了额外的20个值。
在这个Java的RDDReader中,初始化和配置与Java RDDWriter相同,也会使用同一个xml配置文件,应用会执行部分过滤,然后关注存储了多少大于500的值,答案最后会输出出来,下面是Java RDDReader的代码: public class RDDReader { public static void main(String args[]) { SparkConf sparkConf = new SparkConf() .setAppName("RDDReader") .setMaster("local") .set("spark.executor.instances", "2"); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); Logger.getRootLogger().setLevel(Level.OFF); Logger.getLogger("org.apache.ignite").setLevel(Level.OFF); JavaIgniteContext igniteContext = new JavaIgniteContext( sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true); JavaIgniteRDD sharedRDD = igniteContext.fromCache("sharedRDD"); JavaPairRDD greaterThanFiveHundred = sharedRDD.filter(new Function, Boolean>() { public Boolean call(Tuple2 tuple) throws Exception { return tuple._2() > 500; } }); System.out.println("The count is " + greaterThanFiveHundred.count()); System.out.println(">>> Executing SQL query over Ignite Shared RDD..."); Dataset df = sharedRDD.sql("select _val from Integer where _val > 10 and _val < 100 limit 10"); df.show(); igniteContext.close(true); sparkContext.close(); } }
最后,马上就可以对代码进行测试了。
运行这个应用
在第一个终端窗口中,启动Spark的主节点,如下: $SPARK_HOME/sbin/start-master.sh
在第二个终端窗口中,启动Spark工作节点,如下: $SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker spark://ip:port
根据自己的环境,修改IP地址和端口号(ip:port)。
在第三个终端窗口中,启动一个Ignite节点,如下: $IGNITE_HOME/bin/ignite.sh examples/config/spark/example-shared-rdd.xml
这里使用了之前讨论过的 example-shared-rdd.xml 文件。
在第四个终端窗口中,可以运行Scala版的RDDWriter应用,如下: $SPARK_HOME/bin/spark-submit --class "com.gridgain.RDDWriter" --master spark://ip:port "/path_to_jar_file/ignite-spark-scala-1.0.jar"
根据自己的环境修改IP地址和端口(ip:port),以及jar文件的路径(/path_to_jar_file)。
会产生如下的输出: The count is 500
这是我们期望的值。
接下来,杀掉Spark的主节点和工作节点,而Ignite节点仍然在运行中并且IgniteRDD对于其他应用仍然可用,下面会使用IDE通过Java应用接入IgniteRDD。
运行Java版RDDWriter会扩展之前存储于IgniteRDD中的元组列表,通过运行Java版RDDReader可以进行测试,它会产生如下的输出: The count is 520
这也是我们期望的。
最后,SQL查询会在IgniteRDD中执行一个SELECT语句,返回范围在10到100之间的最初10个值,输出如下: +----+ |_VAL| +----+ | 11| | 12| | 13| | 14| | 15| | 16| | 17| | 18| | 19| | 20| +----+
结果正确。
总结
本文中,看到了如何从多个环境中使用多个编程语言轻松地访问IgniteRDD。可以对IgniteRDD进行数据的读写,并且即使Spark已经关闭状态也通过Ignite得以保持,因此可以看到,这为Spark用户带来了很大的灵活性和好处。
在本系列的下一篇文章中,会看到Ignite和Spark整合之后的Ignite DataFrames及其优势。
大数据
2018-09-13 23:45:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
声明: 此篇文章主要是观看静觅教学视频后做的笔记,原教程地址 https://cuiqingcai.com/
实现流程介绍
1.抓取索引页内容:利用requests请求目标站点,得到索引网页HTML代码,返回结果
2.抓取详情页内容:解析返回结果,得到详情页的链接,并进一步抓取详情页信息
3.下载图片与保存数据库:将图片下载到本地,并把页面信息及图片URL保存至MongoDB
4.开启循环及多线程:对多页内容遍历,开启多线程提高抓取速度
具体实现
1. 首先访问今日头条网站输入关键字来到索引页,我们需要通过分析网站来拿到进入详细页的url
经过观察可以发现每次滑动鼠标滚轮,新的标题链接就会被显示,所以可以发现其后台为Ajax请求,通过浏览器Network选项卡的XHR可以找到Ajax的链接,其为一个json数据,以搜索词街拍为例,其链接地址如下: https://www.toutiao.com/search_content/?offset=0&format=json&keyword=%E8%A1%97%E6%8B%8D&autoload=true&count=20&cur_tab=1&from=search_tab
2.通过点击查看Query String Parameters中的内容,可以看到一些类似字典的数据,所以这是一会需要通过urlencode来转码拼接成最终访问的地址 offset: 0 format: json keyword: 街拍 autoload: true count: 20 cur_tab: 1 from: search_tab
3.随着向下滑动滚动条显示更多的图片索引,会发现刷出了很多新的ajax请求,通过这个我们可以知道我们之后可以通过改变offset参数来获取不同的拿到不同的索引界面,从而获得不同的图集详细页url。开始只需实现一个offset参数的爬取,最后通过进程池Pool来创建实现多进程爬取不同offset参数的URL,加快爬取速度
4.接下来就是分析查找图集详细页的代码,来找到图片的url,这个图片url隐藏的比较深,都在JS代码中,所以不能使用BeautifulSoup和PyQuery来解析了,只能通过正则解析,使用正则解析要注意匹配规则一定要写对。刷新页面后,自己基础比较差,找了好久换了火狐浏览器,又换回谷歌,最后在Network选项卡的Doc发现下面这个链接,而图片地址就藏在gallery: JSON.parse里 https://www.toutiao.com/a6585311263927042573/
5.代码实现
代码直接进行展示吧,需要的注释我已经写在代码里了,先编辑一个config.py的文件,里面设置了代码中用到的变量 MONGO_URL = 'localhost' MONGO_DB = 'toutiao' MONGO_TABLE = 'toutiao' GROUP_START = 1 GROUP_END = 20 KEYWORD='街拍' #!/usr/bin/env python # coding=utf-8 from urllib.parse import urlencode from requests.exceptions import ConnectionError from bs4 import BeautifulSoup from json.decoder import JSONDecodeError from hashlib import md5 from config import * from multiprocessing import Pool import requests import json import re import os import pymongo client = pymongo.MongoClient(MONGO_URL, connect=False) db = client[MONGO_DB] def get_page_index(url, headers): """ 作用:返回页面源码 url:请求地址 headers:请求头信息 """ try: response = requests.get(url, headers=headers) # 判断是否访问成功 if response.status_code == 200: return response.text except ConnectionError: print('Erroe occured') return None def parse_page_index(html): """ 作用:解析出标题URL地址 html:网页源码 """ try: # 将数据转为json格式 data = json.loads(html) # print(data) # 判断data是否为空,以及data字典中是否有data这个键 if data and 'data' in data.keys(): for item in data.get('data'): if item.get('article_url'): yield item.get('article_url') except JSONDecodeError: pass def get_page_detail(url, headers): """ 作用:返回标题URL网页源码 url:标题URL地址 headers:请求头信息 """ try: response = requests.get(url, headers=headers) # 判断是否访问成功 if response.status_code == 200: return response.text except ConnectionError: print('Error occured') return None def parse_page_detail(html, url): """ 作用:解析标题URL地址的每个图片链接 html:标题URL网页源码 url:标题URL地址 """ # 利用BeautifulSoup找到title的文本 soup = BeautifulSoup(html, 'lxml') title = soup.title.text # 利用正则找到每个下载图片的地址 images_pattern = re.compile('gallery: JSON.parse\("(.*)"\)', re.S) result = images_pattern.search(html) # print(result) if result: data = json.loads(result.group(1).replace('\\', '')) # 提取出sub_images键的键值 if data and 'sub_images' in data.keys(): sub_images = data.get('sub_images') # 使用列表生成式拿到每个图片URL images = [item.get('url') for item in sub_images] for image in images: # 下载图片 download_image(image) # 将return的结果保存至MongoDB中 return { 'title': title, 'url': url, 'images': images } def download_image(url): """ 作用:返回图片URL源码 url:图片URL地址 """ print('Downloading', url) try: response = requests.get(url) # 判断是否访问成功 if response.status_code == 200: save_image(response.content) return None except ConnectionError: return None def save_image(content): """ 作用:保存图像文件 content:图像二进制数据 """ # 使用md5加密内容,生成图像名称 file_path = '{0}/{1}.{2}'.format(os.getcwd(), md5(content).hexdigest(), 'jpg') print(file_path) # 判断该文件名是否存在 if not os.path.exists(file_path): with open(file_path, 'wb') as f: f.write(content) f.close() def save_to_mongo(result): """ 作用:保存数据至MongoDB数据库 result:包括图片标题,请求地址,图像地址 """ if db[MONGO_TABLE].insert(result): print('Successfully Saved to Mongo', result) return True return False def jiepai_Spider(offset): """ 作用:整个爬虫调度器 offset:位置参数 """ headers = { 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.62 Safari/537.36' } data = { "offset": offset, "format": "json", "keyword": "街拍", "autoload": "true", "count": "20", "cur_tab": "1", "from": "search_tab" } # 通过urlencode构造请求URL url = 'https://www.toutiao.com/search_content/' + '?' + urlencode(data) # 测试url # print(url) # 获取页面源码 html = get_page_index(url, headers) # 解析HTML,获得链接地址 for url in parse_page_index(html): # print(url) # 获得每个链接地址的HTML html = get_page_detail(url, headers) result = parse_page_detail(html, url) # 判断result是否为空,保存至MongoDB数据库中 if result: save_to_mongo(result) if __name__ == "__main__": # 创建进程池 pool = Pool() groups = ([x * 20 for x in range(GROUP_START, GROUP_END + 1)]) pool.map(jiepai_Spider, groups) pool.close() # 等待pool中所有子进程执行完成,必须放在close语句之后 pool.join()
总结思考
1.在利用正则进行匹配的时候如果原文有‘(“ ”)',这类符号时那么你在进行正则表达式书写的时候应该在前面加'\'。按理应该也可以使用原始字符串r,可是我用完最后在匹配的时候返回的是None,留个疑问 pattern = re.compile('gallery: JSON\.parse\("(.*?)"\),', re.S)
2. db = client[MONGO_DB]这里应该是方括号而不是 ( ),否则无法正常访问数据库
3. 在Google浏览器中找不到图片url,然后使用的是火狐浏览器来回查看
4.完整源码地址: https://github.com/XiaoFei-97/toutiao_Spider-Ajax
原文出处: https://www.jzfblog.com/detail/66 ,文章的更新编辑以此链接为准。欢迎关注源站文章!
大数据
2018-09-13 21:09:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
声明 :此篇文章主要是观看静觅教学视频后做的笔记,原教程地址: https://cuiqingcai.com/
流程框架
1.抓取单页内容:利用requests请求目标站点,得到单个网页HTML代码,返回结果。
2.正则表达式分析:根据HTML代码分析得到电影的名称,主演,上映时间,评分,图片链接等信息
3.开启循环及多线程:对多页内容遍历,开启多线程提高抓取速度
4.保存至文件:通过文件的形式将结果保存,每一部电影一个结果一行Json字符串
流程设计
1.maoyan_Spider函数是一个整体的爬虫调度器,其中包含了请求的url地址,headers请求头 def maoyan_Spider(offset): """ 作用:猫眼电影调度器 offset:get的页码参数 """ url = 'http://maoyan.com/board/4?offset=' + str(offset) headers = { 'User-Agent':'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.62 Safari/537.36' } # 获取单页html html = get_one_page(url, headers) # print(html) # 将每一页的电影信息解析,并写入文件中 for item in parse_one_page(html): # print(item) item_str = json.dumps(item, ensure_ascii=False) write_to_file(item_str)
2.首先以单个网页源代码进行分析,进入到猫眼电影官网榜单,通过构造请求可以拿到http://maoyan.com/board/4?的源码 def get_one_page(url, headers): """ 作用:获取一页的源码 url:请求地址 headers:请求头 """ try: response = requests.get(url, headers=headers) if response.status_code == 200: return response.text return None except RequestException: return None
3.接下来感觉整个爬虫的难点就在于解析,源码有了,这里我使用的还是正则来提取数据,稍有字符写错,可能就解析不出来了。下面以Top榜第一名为例进行解析,并转化为json格式保存
1 霸王别姬

霸王别姬

主演:张国荣,张丰毅,巩俐

上映时间:1993-01-01(中国香港)

9.6


可以得出其字符串匹配规则是 '
.*?board-index.*?>(.*?).*?data-src="(.*?)".*?name">(.*?).*?star">(.*?)

.*?releasetime">(.*?)

.*?integer">(.*?).*?fraction">(.*?)'
4.然后通过使用re模块findall方法找到所有电影信息,电影信息包含排名,图片地址,电影名称,主演,上映时间,评分。findall方法返回的是一个元素为元组的列表,然后遍历这些元素通过yield生成json字典形式 def parse_one_page(html): """ 作用:解析一页的源码 html:网页源码 """ pattern = re.compile('
.*?board-index.*?>(.*?).*?data-src="(.*?)".*?name">(.*?).*?star">(.*?)

.*?releasetime">(.*?)

.*?integer">(.*?).*?fraction">(.*?)', re.S) items = pattern.findall(html) # print(items) for item in items: yield { 'index': item[0], 'image': item[1], 'title': item[2], 'actor': item[3].strip()[3:], 'time': item[4].strip()[5:], 'score': item[5]+item[6] }
5.最后是写入文件,因为此处是对每个电影信息进行写入,所以注意写入方法用的是a,不是w。否则写入的内容只有一个电影信息,也就是Top100 def write_to_file(item): """ 作用:往文件中写入内容 item:处理后的单个电影信息 """ with open('result.txt', 'a', encoding='utf-8') as f: f.write(item + '\n')
6.完整源码地址: https://github.com/XiaoFei-97/maoyan_Spider-Top100
原文出处: https://www.jzfblog.com/detail/64 ,文章的更新编辑以此链接为准。欢迎关注源站文章!
大数据
2018-09-13 19:31:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
内涵段子前段时间发生的事情,想必大家都有所耳闻,我本人也是非常喜欢看段子的。但这并不能阻止我想看段子的脚步,幸好,给大家推荐一个网站,叫做 内涵吧 ,里面也有非常丰富的段子内容。下面就以 内涵吧 为例爬取所有段子,可供离线观看。
先分析请求的url地址
1.第一页段子url: https://www.neihan8.com/wenzi/index.html
2.第二页段子url: https://www.neihan8.com/wenzi/index.html
3.第三页段子url: https://www.neihan8.com/wenzi/index.html
4.那么就可以得出一个规律,也就是第n页段子url: https://www.neihan8.com/wenzi/index.html
如果要设计一个较为人性化的使用方式,可以让用户自己设定爬取页码的开始和结束,部分代码如下所示: start_page = int(raw_input("请输入您要下载的起始页:")) end_page = int(raw_input("请输入您要下载的结束页:")) for page in range(start_page, end_page+1): if page == 1: url = "https://www.neihan8.com/wenzi/index.html" else: url = "https://www.neihan8.com/wenzi/index_" + str(page) + ".html"
整理逻辑功能
通过第一步已经拿到了每页的url,而每页内容都是一个段子列表,每页至少20个段子,这些列表包含段子的标题和每个段子的url,所以要提取出来,然后逐个读取。再将读取的内容与标题拼接成字符串,通过文件的读写功能写入每一个文件中
1.下载每页所有的段子 def loadPage(self, page, url): """ 作用:下载该页所有的段子 page:当前下载的页码 url:当前页的url地址 """ print "*"*50 print "正在下载第%s页"%page request = urllib2.Request(url, headers=self.headers) response = urllib2.urlopen(request) html = response.read() # print html return html
2.提取出url和标题 def dealPage(self, page, html): """ 作用:处理当前页面的段子 page:当前处理的页码 html:当前页面的html内容 """ print "*"*50 print "正在处理第%s页"%page pattern = re.compile(r'

(.*?)

', re.S) urls_names = pattern.findall(html) duanzi_dict = self.dealEach(urls_names) return duanzi_dict
3.使用正则解析每个段子的url def dealEach(self, urls_names): """ 作用:处理每个段子的内容 urls_names:每个段子的url和标题 """ duanzi_names = [] duanzi_contents = [] duanzi_content = '' for url,name in urls_names: duanzi_url = "https://www.neihan8.com" + url # print duanzi_url request = urllib2.Request(duanzi_url, headers=self.headers) response = urllib2.urlopen(request) html = response.read() # print html pattern = re.compile(r'

(.*?)

') duanzi_content = ''.join(pattern.findall(html)).replace("“","").replace("”","").replace("…","").replace(""","").replace("…","").replace(" ","") # print duanzi_content duanzi_contents.append(duanzi_content) duanzi_names.append(name) duanzi_dict = dict(zip(duanzi_names, duanzi_contents)) return duanzi_dict
4.将段子拼接并写入文件 def writePage(self, page, duanzi_dict): """ 作用:写入该页所有的段子 duanzi_dict:该页所有的段子标题与内容 """ print "*"*50 print "正在写入第%s页"%page print "*"*50 num = 1 with open("第%s页"%page, "w") as f: for duanzi_name in duanzi_dict: print "正在记录第%s个段子"%num f.write(duanzi_name + '\n' + duanzi_dict[duanzi_name]+ '\n' + '\n' + '\n') num += 1
全部代码 #!/usr/bin/env python # coding=utf-8 import urllib2 import re class Spider(object): def __init__(self): self.headers = {"User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.62 Safari/537.36"} def loadPage(self, page, url): """ 作用:下载该页所有的段子 page:当前下载的页码 url:当前页的url地址 """ print "*"*50 print "正在下载第%s页"%page request = urllib2.Request(url, headers=self.headers) response = urllib2.urlopen(request) html = response.read() # print html return html def dealPage(self, page, html): """ 作用:处理当前页面的段子 page:当前处理的页码 html:当前页面的html内容 """ print "*"*50 print "正在处理第%s页"%page pattern = re.compile(r'

(.*?)

', re.S) urls_names = pattern.findall(html) duanzi_dict = self.dealEach(urls_names) return duanzi_dict def dealEach(self, urls_names): """ 作用:处理每个段子的内容 urls_names:每个段子的url和标题 """ duanzi_names = [] duanzi_contents = [] duanzi_content = '' for url,name in urls_names: duanzi_url = "https://www.neihan8.com" + url # print duanzi_url request = urllib2.Request(duanzi_url, headers=self.headers) response = urllib2.urlopen(request) html = response.read() # print html pattern = re.compile(r'

(.*?)

') duanzi_content = ''.join(pattern.findall(html)).replace("“","").replace("”","").replace("…","").replace(""","").replace("…","").replace(" ","").replace("—","") # print duanzi_content duanzi_contents.append(duanzi_content) duanzi_names.append(name) duanzi_dict = dict(zip(duanzi_names, duanzi_contents)) return duanzi_dict def writePage(self, page, duanzi_dict): """ 作用:写入该页所有的段子 duanzi_dict:该页所有的段子标题与内容 """ print "*"*50 print "正在写入第%s页"%page print "*"*50 num = 1 with open("第%s页.txt"%page, "w") as f: for duanzi_name in duanzi_dict: print "正在记录第%s个段子"%num f.write(duanzi_name + '\n' + duanzi_dict[duanzi_name]+ '\n' + '\n' + '\n') num += 1 def neihan(self): list_urls = [] start_page = int(raw_input("请输入您要下载的起始页:")) end_page = int(raw_input("请输入您要下载的结束页:")) if start_page <=0 or end_page <=0: print "当前输入不合法" return for page in range(start_page, end_page+1): if page == 1: url = "https://www.neihan8.com/wenzi/index.html" else: url = "https://www.neihan8.com/wenzi/index_" + str(page) + ".html" # print url html = self.loadPage(page, url) duanzi_dict = self.dealPage(page, html) self.writePage(page, duanzi_dict) if __name__ == "__main__": neihan_Spider = Spider() neihan_Spider.neihan()
原文出处: https://www.jzfblog.com/detail/32 ,文章的更新编辑以此链接为准。欢迎关注源站文章!
大数据
2018-09-13 19:21:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
今天是学习爬虫的第二天了,看了网上很多初学者都喜欢爬取字典翻译。确实,有一个自己随时随地使用的翻译神器,省去了打开浏览器输入网址的那些繁琐,也腾出了不少时间。在这里我选择的是有道翻译,相比于百度翻译来说,它的特点就是使用了post请求来获取json格式数据,而百度翻译使用的是get请求。因为博主暂时还处于爬虫的入门阶段,而网上的入门资料和图书馆借阅的书籍还是python2相对多一些,所以我使用的库也是urllib2和urllib,但后面还是肯定会转到python3,毕竟python2到2020年官方就会停止支持了,还有一年多的时间。
1.分析post请求的URL: http://fanyi.youdao.com/translate_o?smartresult=dict&smartresult=rule
2.查看Form Data
其为post请求的数据,将其粘帖到sublime编辑器下,使用正则替换,将其转化为字典的形式 "i": "python", "from": "AUTO", "to": "AUTO", "smartresult": "dict", "client": "fanyideskweb", "salt": "1529759898442", "sign": "8fa19ef594cd75bff554ef1f03dc5901", "doctype": "json", "version": "2.1", "keyfrom": "fanyi.web", "action": "FY_BY_REALTIME", "typoResult": "false",
3.代码实现 ​ #!/usr/bin/env python # coding=utf-8 import urllib import urllib2 def youdao(): # 构建url链接 # url = 'http://fanyi.youdao.com/translate_o?smartresult=dict&smartresult=rule' # 这里要去掉?号前面的_o,不然会进行加密算法,导致失败 url = 'http://fanyi.youdao.com/translate?smartresult=dict&smartresult=rule' # 构建请求头 headers = { "User-Agent" : 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:59.0) Gecko/20100101 Firefox/59.0' } words = raw_input("请输入要翻译的内容:") # 构建请求体 format_data = { 'i': words, 'from':'AUTO', 'to':'AUTO', 'smartresult':'dict', 'client':'fanyideskweb', 'salt':'1526368137702', 'sign':'f0cd13ef1919531ec9a66516ceb261a5', 'doctype':'json', 'version':'2.1', 'keyfrom':'fanyi.web', 'action':'FY_BY_REALTIME', 'typoResult':'false' } # 进行url编码 format_data = urllib.urlencode(format_data) # 获取request文件(传入了data参数,就是post请求) request = urllib2.Request(url, data = format_data, headers = headers) # 打开请求文件 response = urllib2.urlopen(request) # 读取文件内容 content = response.read() content = eval(content) ret = content["translateResult"][0][0]['tgt'] print(ret) #return ret if __name__ == "__main__": youdao() ​
原文出处: https://www.jzfblog.com/detail/16 ,文章的更新编辑以此链接为准。欢迎关注源站文章!
大数据
2018-09-13 19:13:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
经常有人拿 Ignite 和 Spark 进行比较,然后搞不清两者的区别和联系。Ignite 和 Spark,如果笼统归类,都可以归于内存计算平台,然而两者功能上虽然有交集,并且 Ignite 也会对 Spark 进行支持,但是不管是从定位上,还是从功能上来说,它们差别巨大,适用领域有显著的区别。本文从各个方面对此进行对比分析,供各位技术选型参考。
一、综述
Ignite 和 Spark 都为 Apache 的顶级开源项目,遵循 Apache 2.0 开源协议,经过多年的发展,二者都已经脱离了单一的技术组件或者框架的范畴,向着多元化的生态圈发展,并且发展速度都很快。
Ignite
Ignite 技术来源于 GridGain 公司的商业产品,于 2014 年将绝大部分功能捐赠给 Apache 社区,并于 2015 年 8 月毕业成为 Apache 的顶级项目。Ignite 目前一直保持着高强度的快速迭代式开发,基本一个季度发布一个大版本,从提交数量、版本发布数量等若干指标来评估,一直保持在 Apache 社区 300 多个开源项目的前五位。目前已经聚拢了来自多家组织或公司的众多开发者,处于非常活跃的状态,开发者社区和产品生态正在形成中。
Spark
作为 Hadoop 生态圈重要成员的 Spark 于 2009 年由 Matei Zaharia 在加州大学伯克利分校 AMPLab 开发,于 2013 年 6 月捐赠给 Apache 基金会并切换协议至 Apache2.0,2014 年 2 月毕业成为 Apache 的顶级项目。鉴于 Spark 核心计算模型的先进性,它吸引了众多大企业和组织的积极参与,促成了 Spark 的高速发展和社区的空前繁荣,随着 Spark 技术不断地向纵深发展以及向外延伸,形成了庞大的 Spark 社区和生态圈,目前几乎成为了大数据领域影响力最大的开源项目。
二、定位
Ignite 和 Spark 都是分布式架构,都归类于目前的大数据技术类别,二者都是利用大量内存的高性能,为原有的技术方案进行提速,但是定位差别很大。
Ignite
Ignite 的核心定位是一个分布式的 内存缓存解决方案 ,通过将数据保存在内存中,提供比传统的基于磁盘的方案更快的性能。然后在分布式缓存的基础上,一方面进一步深入,通过标准 SQL 功能的引入,向分布式内存数据库的方向发展,一方面功能不断扩展,引入了内存计算、流数据处理、机器学习等功能。Ignite 部署灵活,可以轻易地集成进已有的系统,非常方便地与已有的数据库系统集成(NoSQL、HDFS 也支持),为已有的业务进行加速服务。不颠覆已有架构,是 Ignite 很重要的逻辑。
Spark
Spark 的核心定位是一个 分布式统一大数据分析引擎 ,通过先进的 RDD 模型和大量内存的使用,解决了使用 Hadoop 的 MapReduce 进行多轮迭代式计算的性能问题。然后在 RDD 的基础上不断完善,引入了 Dataset 和 DataFrame、SparkSQL、Spark Streaming、SparkML 等更高级的功能。Spark 对 Hadoop 技术栈有非常好的支持,很多可以直接集成,虽然也可以支持 RDBMS 的读写,但是这不是 Spark 主要的关注方向。
三、核心技术
Ignite 和 Spark 核心技术截然不同。
Ignite
Ignite 的核心数据结构为分布式哈希,即键-值型存储,和 Redis 等可以归于同一类,对于分布式内存数据库,核心技术来源于 H2 数据库,也即 Ignite 对 SQL 的支持来源于 H2 的 SQL 引擎。Ignite 的核心计算模型为 MapReduce+支持 SQL 查询的缓存优化。
Ignite 的内存数据模型为固化内存架构,同时支持内存存储和磁盘存储(可选)。数据保存在堆外,因此只要内存够用,不用担心内存溢出,也不用担心大量占用内存导致垃圾回收暂停。
Spark
Spark 的核心是建立在统一的抽象 RDD 之上,使得 Spark 的各个组件可以无缝进行集成,在同一个应用程序中完成大数据计算任务。RDD 的设计理念源自 AMP 实验室发表的论文《 Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing 》。RDD 可以认为是 MapReduce 的超集,也即 RDD 也可以实现传统的 MapReduce 计算机制。
四、部署模型
Ignite 和 Spark 的组网基本模式有很大的不同,但在更高层面的资源管理上,支持能力是差不多的。
Ignite
Ignite 集群基于无共享架构,所有的集群节点都是平等的、独立的,整个集群不存在单点故障。 通过灵活的 Discovery SPI 组件,Ignite 节点可以自动地发现对方,因此只要需要,可以轻易地对集群进行缩放。
Ignite 可以独立运行,可以组成集群,可以运行于 Kubernetes 和 Docker 容器中,也可以运行在 Apache Mesos 以及 Hadoop Yarn 上,可以运行于虚拟机和云环境,也可以运行于物理机,从技术上来说,集群部署在哪里,是没有限制的。
Ignite 还支持嵌入式部署,也就是和应用集成在一起。
Spark
Spark 支持四种分布式部署方式:分别是 Standalone、Spark on Mesos、Spark on YARN 和 Kubernetes。
Spark 的部署属于 Master/Slave 模式,可能存在单点故障问题,但是可以通过 ZooKeeper 解决。
五、功能
内存计算
Ignite 和 Spark 都有内存计算的能力,尤其内存计算是 Spark 的主打功能,从技术原理上来看它们的能力:SparkRDD > Ignite MapReduce+Cache > Hadoop MapReduce。
但具体来说,Ignite 的计算模型优于 Hadoop 毋庸置疑。但是 Ignite 和 Spark,虽然 Ignite 技术原理上不如 SparkRDD 先进,但是落实到具体的实践中,则要看具体的业务场景、技术人员对技术和设计的掌控力、代码优化程度等,无法直接下结论,这个要具体问题具体分析。
Spark 擅长的多轮迭代式计算、交互式计算、图计算等,Ignite 则没有对应的解决方案。
Ignite
Ignite 的计算功能原理与 Hadoop 一致,都是 MapReduce 范式,即可以将一个批量任务拆分为多个部分,然后在不同的节点并行执行,这样就可以并行地利用所有节点的资源,来减少计算任务的整体执行时间。
但是 Ignite 的计算有两个重要的独特之处,一个是鉴于 Ignite 灵活的部署模型,Ignite 可以是离线计算,也可以是在线计算,对于在线的场景,比如 OLTP 业务,它可以通过将请求中的计算负载同步地放在多个可用节点上,然后将结果返回,这样可以提高整个系统的扩展性和容错能力。 另一个是计算可以和数据并置,即计算会被发送到要处理的数据所在的节点,这样会使开销最小化。
Spark
Spark 的计算模型从原理上来说,作为 MapReduce 的超集是非常先进的,Spark 也具有 MapReduce 的机制和开发接口,所以用 Spark 实现 MapReduce 计算模型是可以的。
Spark 的核心概念 RDD,作为一个通用的数据抽象,着重解决了 MapReduce 模型在处理多轮迭代式算法(比如机器学习、图算法等)的性能瓶颈,避免了中间结果落盘导致的大量数据复制、磁盘 IO 和序列化开销。但是 Spark 的计算功能是按照离线系统设计的,无法实现 Ignite 的在线计算功能。
存储支持能力
Ignite 和 Spark 都可以将第三方存储作为数据来源用作后续的处理,两者对第三方存储的支持程度、侧重点完全不同。这里说的第三方存储,暂时划分为传统的 RDBMS 和 NoSQL(HDFS、Hive、Cassandra 等)。但是 Ignite 在支持第三方存储的同时,本身还具有原生持久化的能力。
Ignite RDBMS:Ignite 作为一个缓存系统,天然对 RDBMS 有良好的支持,基本上只要支持 JDBC/ODBC 协议的数据库都没有问题。对于数据的加载、数据的读写及其一致性(事务)保证、各种工具的支持、各种通信协议的支持都一应俱全,是一个完整的方案; NoSQL:Ignite 对于各种 NoSQL 数据库的支持是有限的,因为功能定位的原因,不是任何 NoSQL 产品都适合和 Ignite 整合进而提升能力,就目前来说,Ignite 在不同的功能场景对 NoSQL 提供了支持,包括对 HDFS 的支持,也包括与 Cassandra 的原生集成; 原生持久化:Ignite 基于固化内存架构,提供了原生持久化,可以同时处理存储于内存和磁盘上的数据和索引,它将内存计算的性能和扩展性与磁盘持久化和强一致性整合到一个系统中。 原生持久化以有限的性能损失,透明地提供了更强大的功能,即使整个集群重启,内存不需要预热,数据可以直接访问。
Spark RDBMS:SparkRDD 可以将 RDBMS 作为数据来源之一,支持 RDBMS 数据的批量读写,也支持各种类型的 RDBMS,但是 Spark 对 RDBMS 的读写,属于批量模式,Spark 更多地会将 RDBMS 作为分析型业务的数据来源之一,最后如有必要,则将业务分析的结果批量回写 RDBMS; NoSQL:Spark 原生支持 JDBC、JSON、Parquet、csv、libsvm 以及 orcFile 等,也可以通过扩展接口自定义数据源。Spark 可以直接或者通过各种连接器读取 Hive、Hbase、Cassandra 中的数据,然后创建对应的 RDD,写入也是同理,这个能力是 Ignite 所不具备的; 原生持久化:Spark 不具备原生的持久化能力。
SQL
Ignite 和 Spark 都支持 SQL,但是两者的定位和能力,有所不同。
Ignite
Ignite SQL 目前的语法兼容于 ANSI-99,支持查询、删除、更新与插入,但语法和功能与标准并不完全一致。Ignite 如果做好了数据并置,SQL 查询的性能是很好的,同时 Ignite 还支持索引,这都进一步提升了 Ignite SQL 的能力。另外,Ignite SQL 对缓存的功能进行了极大的增强,通常用于缓存的在线查询和计算,用于离线数据处理也是可以的。
Spark
SparkSQL 最初来源于 Shark 项目,后来两者进行了合并,SparkSQL 构建于 Dataset/DataFrame 机制基础上,目前只支持查询,主要适用于分析型业务以及对来自不同数据源的结构化数据进行处理。它也可以进行交互式查询,因为不支持索引等等原因,所以性能较差,响应时间可能较长。
数据一致性(事务)
Ignite
Ignite 整体来说对事务的支持还不完善,具体来说,在键-值 API 层面,有完善的事务机制,主要原理来自于经过优化的二阶段提交协议,但是 SQL 层面的 DML 语句还不支持事务,未来版本会解决该问题。
在计算层面,因为支持丰富的编程接口,也可以非常容易地与各种开源的 ORM 框架集成,所以也可以方便地对事务进行细粒度的控制,比如 CRUD 都是没问题的。
Spark
SparkSQL 本身并不提供事务机制。Spark 本身也不适用于 RDBMS 的细粒度数据维护,RDBMS 对于 Spark 来说,只是数据的一个来源和存储地之一,通常都是批量操作,如果批量操作失败,Spark 有容错机制可以重来,以保证整体的一致性。
流计算
Spark 有 Spark Streaming,Ignite 也支持流数据处理。
Ignite
Ignite 可以与主流的流处理技术和框架进行集成,比如 Kafka、Camel、Storm 与 JMS,提供可扩展和容错的能力。流处理技术为 Ignite 提供了一种数据加载机制,针对流式数据,Ignite 也提供了各种处理和查询功能。Ignite 社区官方提供了 10 种流处理技术的集成实现,利用统一的 API,开发者也可以自行开发流处理技术实现。Ignite 为所有流入 Ignite 的数据以可扩展和容错的方式提供至少一次保证。
Spark
Spark Streaming 是基于 Spark 的流式批处理引擎,其基本原理是把输入数据以某一时间间隔批量的处理,即以时间为单位切分数据流,每个切片内的数据对应一个 RDD,进而可以采用 Spark 引擎进行快速计算。其同样支持众多的数据源,内部的数据表示形式为 DStream。Spark Streaming 吞吐量高,可以做复杂的业务逻辑,但是秒级别的延迟是否符合业务需求需要确认。Spark Streaming 可以与 Spark 其他技术完美集成,包括 SparkML、SparkSQL 等。
机器学习
Ignite 和 Spark 都支持机器学习。
Ignite
Ignite 从 2.5 版本开始,提供了完整的机器学习解决方案,Ignite 的机器学习有两个优点:一个是如果已经在 Ignite 中持有了大量的数据,那么继续在 Ignite 中进行机器学习的训练和推理,就不需要在不同系统间进行 ETL 的等待,提高效率。另一个是 Ignite 提供了一系列的机器学习和深度学习算法,对 Ignite 的分布式并置处理进行优化,这样在处理大规模的数据集或者不断增长的输入数据流时,提供了内存级的速度和近乎无限的扩展性,而不需要将数据移到另外的存储。目前支持的算法包括回归、分类、聚类以及对数据进行预处理等。另外 Ignite 还支持了一组遗传算法,该算法适合于以最优的方式检索大量复杂的数据集。
Spark
Spark 很早就包含了机器学习库,RDD 模型面向的一个主要场景就是机器学习这样的多轮迭代式计算。目前的 Spark 机器学习库有 2 个实现,正在逐步向 SparkML 过渡,SparkML 基于 DataFrame API,更强大更灵活,而传统的 MLlib 会处于维护状态。SparkML 基于 DataFrames 对 API 进行了统一,使用体验更友好。可以使用 SparkSQL 等更高级的功能,支持流水线,特别是特征变换。Spark 的机器学习因为 RDD 的原因性能更好,支持的算法也更多。
图计算
Ignite
暂不支持
Spark
Spark 中包含了 GraphX,这是一个图计算组件。它在 RDD 基础上引入了新的 Graph 抽象,为了支持图形计算,GraphX 公开了一组基本运算符(例如子图、连接顶点和聚合消息)以及 Pregel API 的优化变型。此外,GraphX 还包括了越来越多的图形算法和构造者,以简化图形分析任务。
开发语言和客户端协议
Ignite
Ignite 是以 Java 语言为主进行开发的,因此可以在 JVM 支持的任何操作系统和架构上部署和运行。Java 的 API 支持 Ignite 的所有功能,使用 Java 或者 Scala 开发的应用,相关的逻辑可以直接嵌入 Ignite,然后借助于 SQL 以及键-值操作与集群进行交互,执行分布式计算和机器学习算法等等。
除了 Java,Ignite 还支持 .NET 平台与 C++,Ignite.NET 和 Ignite C++ 使用 JNI,会把大部分的调用转发给 Java。
Ignite 还支持使用标准的 JDBC 或者 ODBC 连接,可以像其它 SQL 存储一样与 Ignite 进行交互。Ignite 还为 Java、.NET 和 C++ 开发者提供原生的 SQL API,性能更好。
Ignite 还支持其它的语言访问,比如 Python、Ruby、PHP 与 NodeJS,另外还可以考虑使用 Ignite 的二进制客户端协议接入集群。
Spark
Spark 使用 Scala 语言开发,目前支持使用 Scala、Java、Python、R 语言开发 Spark 程序。
监控运维工具支持
Ignite
Ignite 开源版没有提供图形化的监控工具,但是提供了简易的命令行工具,同时为了简化开发,Ignite 提供了图形化的 Web 控制台。
Ignite 运行时可以通过 API 接口获取大量的指标,通过编程的方式了解集群的状况。
如果需要强大的监控运维工具,可以购买 GridGain 的商业版软件和服务。如果搭建的是一个小规模的集群,鉴于 Ignite 的无共享架构,部署运维都是比较简单的。
Spark
Spark 启动后会有一个 Web 控制台,虽然不是很美观,但是可以从总体上看到 Spark 的当前运行状态。
Spark 属于 Master/Slave 模式,如果直接拿开源版本搭建大规模集群,部署运维还是非常麻烦的,但是国内有很多厂商开发包含 Spark 组件的大数据平台,为部署和运维提供了很大的便利。
六、总结
综上所述,Ignite 和 Spark 功能都很全面,已经脱离了简单开源技术组件的范围,都成为了自成体系的开源大数据平台。上面主要对 Ignite 和 Spark 的主要功能做了简单的梳理对比,不一定全面,也没有对其各自特有的功能进行梳理。但经过这么一些分析,还是可以得出这样一个结论: 两者差别很大,定位不同,因此会有不同的适用领域 。
Ignite
Ignite 以缓存为中心构建大数据体系,底层存储模型更偏向传统关系型数据架构,上层为应用开发的便利做了大量的工作,包括为各种常见语言和协议提供支持。中间核心层在缓存的基础上不断向外扩展,功能日趋丰富强大。
Ignite 从定位上来说有两个突出点,一是可以独立组网,构建独立的大数据平台,然后企业在其上开发全新的大数据应用,包括缓存、计算、流数据处理、机器学习应用等等。二是还可以与传统应用紧密整合,在不颠覆已有架构的前提下,帮助用户进行传统应用的分布式架构转型。为运行多年的复杂、运行缓慢、技术架构落后的业务系统,提供加速能力的同时,引入众多的先进功能,大幅提升原有系统的能力从而延长已有架构的寿命,产生更大的价值,保护客户原有投资。
Ignite 的定位和架构,与 Hadoop 体系大数据组件有很大的不同,但是并不冲突,即使企业已经部署了基于 Hadoop 技术体系的大数据平台,那么也可以继续引入 Ignite 作为补充。
Spark
Spark 以计算为中心构建大数据体系,底层存储对各种数据源进行了抽象,总体上更偏向非结构化的数据,上层应用支持多种语言,核心层基于 RDD 模型,然后进行了大量的扩展,支持了更多更高级的功能,比如 SparkSQL、Spark Streaming、SparkML 与 Spark GraphX 等。Spark 的核心优势是进行多轮迭代式计算、交互式计算以及图计算等。
Spark 是围绕 RDD 构建生态,用户可以以 Spark 为中心搭建大数据平台,满足大量数据的获取、清洗、处理、加载、计算、存储等需求,核心定位是解决大数据的分析问题。虽然 Spark 的计算能力也可以处理传统的关系型数据,但这并非 Spark 的强项,因此和传统业务系统并没有太多的交集。企业基于 Spark 搭建大数据平台之后,其上的应用基本需要全新开发。传统的数据处理业务,即使适合用 Spark 实现,原有的业务逻辑也无法直接、简单地移植进入 Spark 技术堆栈。Spark 技术堆栈更适合用于处理传统技术处理起来很麻烦、性能很差、数据量又很大的非结构化数据,Spark 适合对众多系统的相关数据进行整合,通过分析后能产生更大价值的业务场景。
作者
李玉珏 ,架构师,有丰富的架构设计和技术研发团队管理经验,社区技术翻译作者以及撰稿人,开源技术贡献者。Apache Ignite 技术中文文档翻译作者,长期在国内进行 Ignite 技术的推广/技术支持/咨询工作。
本文系作者投稿文章。欢迎投稿。
投稿内容要求 互联网技术相关 ,包括但不限于开发语言、网络、数据库、架构、运维、前端、DevOps(DevXXX)、AI、区块链、存储、移动、安全、技术团队管理等内容。 文章不需要首发,可以是已经在开源中国博客或网上其它平台发布过的。但是 鼓励首发 ,首发内容被收录可能性较大。 如果你是记录某一次解决了某一个问题(这在博客中占绝大比例),那么需要将问题的 前因后果 描述清楚,最直接的就是结合图文等方式将问题复现,同时完整地说明解决思路与最终成功的方案。 如果你是分析某一技术理论知识,请从定义、应用场景、实际案例、关键技术细节、观点等方面,对其进行较为 全面 地介绍。 如果你是以实际案例分享自己或者公司对诸如某一架构模型、通用技术、编程语言、运维工具的实践,那么请将事件相关背景、具体技术细节、演进过程、思考、应用效果等方面 描述清楚 。 其它未尽 case 具体情况具体分析,不虚的,文章投过来试试先,比如我们并不拒绝就某个热点事件对其进行的报导、深入解析。
投稿方式 以 Word 或者 Markdown 文档的形式将稿件投递到 oscbianji@oschina.cn 邮箱
重要说明 作者需要拥有所投文章的所有权,不能将别人的文章拿过来投递。 投递的文章需要经过审核,如果开源中国编辑觉得需要的话,将与作者一起进一步完善文章,意在使文章更佳、传播更广。 文章版权归作者所有,开源中国获得文章的传播权,可在开源中国各个平台进行文章传播,同时保留文章原始出处和作者信息,可在官方博客中标原创标签。
大数据
2018-09-13 15:53:00
「深度学习福利」大神带你进阶工程师,立即查看>>> import org.apache.spark.ml.Pipeline import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder} import org.apache.spark.ml.feature.VectorAssembler import ml.dmlc.xgboost4j.scala.spark.{XGBoostEstimator, XGBoostClassificationModel} import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator} import org.apache.spark.ml.PipelineModel val data = (spark.read.format("csv") .option("sep", ",") .option("inferSchema", "true") .option("header", "true") .load("/user/spark/security/Affairs.csv")) data.createOrReplaceTempView("res1") val affairs = "case when affairs>0 then 1 else 0 end as affairs," val df = (spark.sql("select " + affairs + "gender,age,yearsmarried,children,religiousness,education,occupation,rating" + " from res1 ")) val categoricals = df.dtypes.filter(_._2 == "StringType") map (_._1) val indexers = categoricals.map( c => new StringIndexer().setInputCol(c).setOutputCol(s"${c}_idx") ) val encoders = categoricals.map( c => new OneHotEncoder().setInputCol(s"${c}_idx").setOutputCol(s"${c}_enc").setDropLast(false) ) val colArray_enc = categoricals.map(x => x + "_enc") val colArray_numeric = df.dtypes.filter(_._2 != "StringType") map (_._1) val final_colArray = (colArray_numeric ++ colArray_enc).filter(!_.contains("affairs")) val vectorAssembler = new VectorAssembler().setInputCols(final_colArray).setOutputCol("features") /* val pipeline = new Pipeline().setStages(indexers ++ encoders ++ Array(vectorAssembler)) pipeline.fit(df).transform(df) */ /// // Create an XGBoost Classifier val xgb = new XGBoostEstimator(Map("num_class" -> 2, "num_rounds" -> 5, "objective" -> "binary:logistic", "booster" -> "gbtree")).setLabelCol("affairs").setFeaturesCol("features") // XGBoost paramater grid val xgbParamGrid = (new ParamGridBuilder() .addGrid(xgb.round, Array(10)) .addGrid(xgb.maxDepth, Array(10,20)) .addGrid(xgb.minChildWeight, Array(0.1)) .addGrid(xgb.gamma, Array(0.1)) .addGrid(xgb.subSample, Array(0.8)) .addGrid(xgb.colSampleByTree, Array(0.90)) .addGrid(xgb.alpha, Array(0.0)) .addGrid(xgb.lambda, Array(0.6)) .addGrid(xgb.scalePosWeight, Array(0.1)) .addGrid(xgb.eta, Array(0.4)) .addGrid(xgb.boosterType, Array("gbtree")) .addGrid(xgb.objective, Array("binary:logistic")) .build()) // Create the XGBoost pipeline val pipeline = new Pipeline().setStages(indexers ++ encoders ++ Array(vectorAssembler, xgb)) // Setup the binary classifier evaluator val evaluator = (new BinaryClassificationEvaluator() .setLabelCol("affairs") .setRawPredictionCol("prediction") .setMetricName("areaUnderROC")) // Create the Cross Validation pipeline, using XGBoost as the estimator, the // Binary Classification evaluator, and xgbParamGrid for hyperparameters val cv = (new CrossValidator() .setEstimator(pipeline) .setEvaluator(evaluator) .setEstimatorParamMaps(xgbParamGrid) .setNumFolds(3) .setSeed(0)) // Create the model by fitting the training data val xgbModel = cv.fit(df) // Test the data by scoring the model val results = xgbModel.transform(df) // Print out a copy of the parameters used by XGBoost, attention pipeline (xgbModel.bestModel.asInstanceOf[PipelineModel] .stages(5).asInstanceOf[XGBoostClassificationModel] .extractParamMap().toSeq.foreach(println)) results.select("affairs","prediction").show println("---Confusion Matrix------") results.stat.crosstab("affairs","prediction").show() // What was the overall accuracy of the model, using AUC val auc = evaluator.evaluate(results) println("----AUC--------") println("auc="+auc)
大数据
2018-09-13 11:33:00
「深度学习福利」大神带你进阶工程师,立即查看>>> import org.apache.spark.ml.Pipeline import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder} import org.apache.spark.ml.feature.VectorAssembler import ml.dmlc.xgboost4j.scala.spark.{XGBoostEstimator, XGBoostClassificationModel} import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator} import org.apache.spark.ml.PipelineModel val data = (spark.read.format("csv") .option("sep", ",") .option("inferSchema", "true") .option("header", "true") .load("/Affairs.csv")) data.createOrReplaceTempView("res1") val affairs = "case when affairs>0 then 1 else 0 end as affairs," val df = (spark.sql("select " + affairs + "gender,age,yearsmarried,children,religiousness,education,occupation,rating" + " from res1 ")) val categoricals = df.dtypes.filter(_._2 == "StringType") map (_._1) val indexers = categoricals.map( c => new StringIndexer().setInputCol(c).setOutputCol(s"${c}_idx") ) val encoders = categoricals.map( c => new OneHotEncoder().setInputCol(s"${c}_idx").setOutputCol(s"${c}_enc").setDropLast(false) ) val colArray_enc = categoricals.map(x => x + "_enc") val colArray_numeric = df.dtypes.filter(_._2 != "StringType") map (_._1) val final_colArray = (colArray_numeric ++ colArray_enc).filter(!_.contains("affairs")) val vectorAssembler = new VectorAssembler().setInputCols(final_colArray).setOutputCol("features") /* val pipeline = new Pipeline().setStages(indexers ++ encoders ++ Array(vectorAssembler)) pipeline.fit(df).transform(df) */ /// // Create an XGBoost Classifier val xgb = new XGBoostEstimator(Map("num_class" -> 2, "num_rounds" -> 5, "objective" -> "binary:logistic", "booster" -> "gbtree")).setLabelCol("affairs").setFeaturesCol("features") // XGBoost paramater grid val xgbParamGrid = (new ParamGridBuilder() .addGrid(xgb.round, Array(10)) .addGrid(xgb.maxDepth, Array(10,20)) .addGrid(xgb.minChildWeight, Array(0.1)) .addGrid(xgb.gamma, Array(0.1)) .addGrid(xgb.subSample, Array(0.8)) .addGrid(xgb.colSampleByTree, Array(0.90)) .addGrid(xgb.alpha, Array(0.0)) .addGrid(xgb.lambda, Array(0.6)) .addGrid(xgb.scalePosWeight, Array(0.1)) .addGrid(xgb.eta, Array(0.4)) .addGrid(xgb.boosterType, Array("gbtree")) .addGrid(xgb.objective, Array("binary:logistic")) .build()) // Create the XGBoost pipeline val pipeline = new Pipeline().setStages(indexers ++ encoders ++ Array(vectorAssembler, xgb)) // Setup the binary classifier evaluator val evaluator = (new BinaryClassificationEvaluator() .setLabelCol("affairs") .setRawPredictionCol("prediction") .setMetricName("areaUnderROC")) // Create the Cross Validation pipeline, using XGBoost as the estimator, the // Binary Classification evaluator, and xgbParamGrid for hyperparameters val cv = (new CrossValidator() .setEstimator(pipeline) .setEvaluator(evaluator) .setEstimatorParamMaps(xgbParamGrid) .setNumFolds(3) .setSeed(0)) // Create the model by fitting the training data val xgbModel = cv.fit(df) // Test the data by scoring the model val results = xgbModel.transform(df) // Print out a copy of the parameters used by XGBoost, attention pipeline (xgbModel.bestModel.asInstanceOf[PipelineModel] .stages(5).asInstanceOf[XGBoostClassificationModel] .extractParamMap().toSeq.foreach(println)) results.select("affairs","prediction").show println("---Confusion Matrix------") results.stat.crosstab("affairs","prediction").show() // What was the overall accuracy of the model, using AUC val auc = evaluator.evaluate(results) println("----AUC--------") println("auc="+auc)
大数据
2018-09-13 11:31:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
hive里面倒是有个percentile函数和percentile_approx函数,其使用方式为percentile(col, p)、percentile_approx(col, p),p∈(0,1)
其中percentile要求输入的字段必须是int类型的,而percentile_approx则是数值类似型的都可以
其实percentile_approx还有一个参数B:percentile_approx(col, p,B),参数B控制内存消耗的近似精度,B越大,结果的准确度越高。默认为10,000。当col字段中的distinct值的个数小于B时,结果为准确的百分位数。
如果我要求多个分位数怎么办呢?,可以把p换为array(p1,p2,p3…),即
percentile_approx(col,array(0.05,0.5,0.95),9999)
如果不放心的话,就给col再加个转换:
percentile_approx(cast(col as double),array(0.05,0.5,0.95),9999)
其输出结果长这样:
[0.0,4001.0,4061.0]
没法直接用,再加个转换:
explode(percentile_approx(cast(col as double),array(0.05,0.5,0.95),9999))as percentile
大数据
2018-09-12 17:58:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
测试数据 -- 建表 create table student_scores( id int, studentId int, language int, math int, english int, classId string, departmentId string ); -- 写入数据 insert into table student_scores values (1,111,68,69,90,'class1','department1'), (2,112,73,80,96,'class1','department1'), (3,113,90,74,75,'class1','department1'), (4,114,89,94,93,'class1','department1'), (5,115,99,93,89,'class1','department1'), (6,121,96,74,79,'class2','department1'), (7,122,89,86,85,'class2','department1'), (8,123,70,78,61,'class2','department1'), (9,124,76,70,76,'class2','department1'), (10,211,89,93,60,'class1','department2'), (11,212,76,83,75,'class1','department2'), (12,213,71,94,90,'class1','department2'), (13,214,94,94,66,'class1','department2'), (14,215,84,82,73,'class1','department2'), (15,216,85,74,93,'class1','department2'), (16,221,77,99,61,'class2','department2'), (17,222,80,78,96,'class2','department2'), (18,223,79,74,96,'class2','department2'), (19,224,75,80,78,'class2','department2'), (20,225,82,85,63,'class2','department2');
聚合开窗函数
count -- count 开窗函数 select studentId,math,departmentId,classId, -- 以符合条件的所有行作为窗口 count(math) over() as count1, -- 以按classId分组的所有行作为窗口 count(math) over(partition by classId) as count2, -- 以按classId分组、按math排序的所有行作为窗口 count(math) over(partition by classId order by math) as count3, -- 以按classId分组、按math排序、按 当前行+往前1行+往后2行的行作为窗口 count(math) over(partition by classId order by math rows between 1 preceding and 2 following) as count4 from student_scores where departmentId='department1'; 结果 studentid math departmentid classid count1 count2 count3 count4 111 69 department1 class1 9 5 1 3 113 74 department1 class1 9 5 2 4 112 80 department1 class1 9 5 3 4 115 93 department1 class1 9 5 4 3 114 94 department1 class1 9 5 5 2 124 70 department1 class2 9 4 1 3 121 74 department1 class2 9 4 2 4 123 78 department1 class2 9 4 3 3 122 86 department1 class2 9 4 4 2 结果解释: studentid=115,count1为所有的行数9,count2为分区class1中的行数5,count3为分区class1中math值<=93的行数4, count4为分区class1中math值向前+1行向后+2行(实际只有1行)的总行数3。
sum -- sum开窗函数 select studentId,math,departmentId,classId, -- 以符合条件的所有行作为窗口 sum(math) over() as sum1, -- 以按classId分组的所有行作为窗口 sum(math) over(partition by classId) as sum2, -- 以按classId分组、按math排序后、按到当前行(含当前行)的所有行作为窗口 sum(math) over(partition by classId order by math) as sum3, -- 以按classId分组、按math排序后、按当前行+往前1行+往后2行的行作为窗口 sum(math) over(partition by classId order by math rows between 1 preceding and 2 following) as sum4 from student_scores where departmentId='department1'; 结果 studentid math departmentid classid sum1 sum2 sum3 sum4 111 69 department1 class1 718 410 69 223 113 74 department1 class1 718 410 143 316 112 80 department1 class1 718 410 223 341 115 93 department1 class1 718 410 316 267 114 94 department1 class1 718 410 410 187 124 70 department1 class2 718 308 70 222 121 74 department1 class2 718 308 144 308 123 78 department1 class2 718 308 222 238 122 86 department1 class2 718 308 308 164 结果解释: 同count开窗函数
min -- min 开窗函数 select studentId,math,departmentId,classId, -- 以符合条件的所有行作为窗口 min(math) over() as min1, -- 以按classId分组的所有行作为窗口 min(math) over(partition by classId) as min2, -- 以按classId分组、按math排序后、按到当前行(含当前行)的所有行作为窗口 min(math) over(partition by classId order by math) as min3, -- 以按classId分组、按math排序后、按当前行+往前1行+往后2行的行作为窗口 min(math) over(partition by classId order by math rows between 1 preceding and 2 following) as min4 from student_scores where departmentId='department1'; 结果 studentid math departmentid classid min1 min2 min3 min4 111 69 department1 class1 69 69 69 69 113 74 department1 class1 69 69 69 69 112 80 department1 class1 69 69 69 74 115 93 department1 class1 69 69 69 80 114 94 department1 class1 69 69 69 93 124 70 department1 class2 69 70 70 70 121 74 department1 class2 69 70 70 70 123 78 department1 class2 69 70 70 74 122 86 department1 class2 69 70 70 78 结果解释: 同count开窗函数
max -- max 开窗函数 select studentId,math,departmentId,classId, -- 以符合条件的所有行作为窗口 max(math) over() as max1, -- 以按classId分组的所有行作为窗口 max(math) over(partition by classId) as max2, -- 以按classId分组、按math排序后、按到当前行(含当前行)的所有行作为窗口 max(math) over(partition by classId order by math) as max3, -- 以按classId分组、按math排序后、按当前行+往前1行+往后2行的行作为窗口 max(math) over(partition by classId order by math rows between 1 preceding and 2 following) as max4 from student_scores where departmentId='department1'; 结果 studentid math departmentid classid max1 max2 max3 max4 111 69 department1 class1 94 94 69 80 113 74 department1 class1 94 94 74 93 112 80 department1 class1 94 94 80 94 115 93 department1 class1 94 94 93 94 114 94 department1 class1 94 94 94 94 124 70 department1 class2 94 86 70 78 121 74 department1 class2 94 86 74 86 123 78 department1 class2 94 86 78 86 122 86 department1 class2 94 86 86 86 结果解释: 同count开窗函数
avg -- avg 开窗函数 select studentId,math,departmentId,classId, -- 以符合条件的所有行作为窗口 avg(math) over() as avg1, -- 以按classId分组的所有行作为窗口 avg(math) over(partition by classId) as avg2, -- 以按classId分组、按math排序后、按到当前行(含当前行)的所有行作为窗口 avg(math) over(partition by classId order by math) as avg3, -- 以按classId分组、按math排序后、按当前行+往前1行+往后2行的行作为窗口 avg(math) over(partition by classId order by math rows between 1 preceding and 2 following) as avg4 from student_scores where departmentId='department1'; 结果 studentid math departmentid classid avg1 avg2 avg3 avg4 111 69 department1 class1 79.77777777777777 82.0 69.0 74.33333333333333 113 74 department1 class1 79.77777777777777 82.0 71.5 79.0 112 80 department1 class1 79.77777777777777 82.0 74.33333333333333 85.25 115 93 department1 class1 79.77777777777777 82.0 79.0 89.0 114 94 department1 class1 79.77777777777777 82.0 82.0 93.5 124 70 department1 class2 79.77777777777777 77.0 70.0 74.0 121 74 department1 class2 79.77777777777777 77.0 72.0 77.0 123 78 department1 class2 79.77777777777777 77.0 74.0 79.33333333333333 122 86 department1 class2 79.77777777777777 77.0 77.0 82.0 结果解释: 同count开窗函数
first_value -- first_value 开窗函数 select studentId,math,departmentId,classId, -- 以符合条件的所有行作为窗口 first_value(math) over() as first_value1, -- 以按classId分组的所有行作为窗口 first_value(math) over(partition by classId) as first_value2, -- 以按classId分组、按math排序后、按到当前行(含当前行)的所有行作为窗口 first_value(math) over(partition by classId order by math) as first_value3, -- 以按classId分组、按math排序后、按当前行+往前1行+往后2行的行作为窗口 first_value(math) over(partition by classId order by math rows between 1 preceding and 2 following) as first_value4 from student_scores where departmentId='department1'; 结果 studentid math departmentid classid first_value1 first_value2 first_value3 first_value4 111 69 department1 class1 69 69 69 69 113 74 department1 class1 69 69 69 69 112 80 department1 class1 69 69 69 74 115 93 department1 class1 69 69 69 80 114 94 department1 class1 69 69 69 93 124 70 department1 class2 69 74 70 70 121 74 department1 class2 69 74 70 70 123 78 department1 class2 69 74 70 74 122 86 department1 class2 69 74 70 78 结果解释: studentid=124 first_value1:第一个值是69,first_value2:classId=class1分区 math的第一个值是69。
last_value -- last_value 开窗函数 select studentId,math,departmentId,classId, -- 以符合条件的所有行作为窗口 last_value(math) over() as last_value1, -- 以按classId分组的所有行作为窗口 last_value(math) over(partition by classId) as last_value2, -- 以按classId分组、按math排序后、按到当前行(含当前行)的所有行作为窗口 last_value(math) over(partition by classId order by math) as last_value3, -- 以按classId分组、按math排序后、按当前行+往前1行+往后2行的行作为窗口 last_value(math) over(partition by classId order by math rows between 1 preceding and 2 following) as last_value4 from student_scores where departmentId='department1'; 结果 studentid math departmentid classid last_value1 last_value2 last_value3 last_value4 111 69 department1 class1 70 93 69 80 113 74 department1 class1 70 93 74 93 112 80 department1 class1 70 93 80 94 115 93 department1 class1 70 93 93 94 114 94 department1 class1 70 93 94 94 124 70 department1 class2 70 70 70 78 121 74 department1 class2 70 70 74 86 123 78 department1 class2 70 70 78 86 122 86 department1 class2 70 70 86 86
lag lag(col,n,default) 用于统计窗口内往上第n个值。 col:列名 n:往上第n行 default:往上第n行为NULL时候,取默认值,不指定则取NULL -- lag 开窗函数 select studentId,math,departmentId,classId, --窗口内 往上取第二个 取不到时赋默认值60 lag(math,2,60) over(partition by classId order by math) as lag1, --窗口内 往上取第二个 取不到时赋默认值NULL lag(math,2) over(partition by classId order by math) as lag2 from student_scores where departmentId='department1'; 结果 studentid math departmentid classid lag1 lag2 111 69 department1 class1 60 NULL 113 74 department1 class1 60 NULL 112 80 department1 class1 69 69 115 93 department1 class1 74 74 114 94 department1 class1 80 80 124 70 department1 class2 60 NULL 121 74 department1 class2 60 NULL 123 78 department1 class2 70 70 122 86 department1 class2 74 74 结果解释: 第3行 lag1:窗口内(69 74 80) 当前行80 向上取第二个值为69 倒数第3行 lag2:窗口内(70 74) 当前行74 向上取第二个值为NULL
lead lead(col,n,default) 用于统计窗口内往下第n个值。 col:列名 n:往下第n行 default:往下第n行为NULL时候,取默认值,不指定则取NULL -- lead开窗函数 select studentId,math,departmentId,classId, --窗口内 往下取第二个 取不到时赋默认值60 lead(math,2,60) over(partition by classId order by math) as lead1, --窗口内 往下取第二个 取不到时赋默认值NULL lead(math,2) over(partition by classId order by math) as lead2 from student_scores where departmentId='department1'; 结果 studentid math departmentid classid lead1 lead2 111 69 department1 class1 80 80 113 74 department1 class1 93 93 112 80 department1 class1 94 94 115 93 department1 class1 60 NULL 114 94 department1 class1 60 NULL 124 70 department1 class2 78 78 121 74 department1 class2 86 86 123 78 department1 class2 60 NULL 122 86 department1 class2 60 NULL 结果解释: 第4行lead1 窗口内向下第二个值为空,赋值60
cume_dist 计算某个窗口或分区中某个值的累积分布。假定升序排序,则使用以下公式确定累积分布: 小于等于当前值x的行数 / 窗口或partition分区内的总行数。其中,x 等于 order by 子句中指定的列的当前行中的值。 -- cume_dist 开窗函数 select studentId,math,departmentId,classId, -- 统计小于等于当前分数的人数占总人数的比例 cume_dist() over(order by math) as cume_dist1, -- 统计大于等于当前分数的人数占总人数的比例 cume_dist() over(order by math desc) as cume_dist2, -- 统计分区内小于等于当前分数的人数占总人数的比例 cume_dist() over(partition by classId order by math) as cume_dist3 from student_scores where departmentId='department1'; 结果 studentid math departmentid classid cume_dist1 cume_dist2 cume_dist3 111 69 department1 class1 0.1111111111111111 1.0 0.2 113 74 department1 class1 0.4444444444444444 0.7777777777777778 0.4 112 80 department1 class1 0.6666666666666666 0.4444444444444444 0.6 115 93 department1 class1 0.8888888888888888 0.2222222222222222 0.8 114 94 department1 class1 1.0 0.1111111111111111 1.0 124 70 department1 class2 0.2222222222222222 0.8888888888888888 0.25 121 74 department1 class2 0.4444444444444444 0.7777777777777778 0.5 123 78 department1 class2 0.5555555555555556 0.5555555555555556 0.75 122 86 department1 class2 0.7777777777777778 0.3333333333333333 1.0 结果解释: 第三行: cume_dist1=小于等于80的人数为6/总人数9=0.6666666666666666 cume_dist2=大于等于80的人数为4/总人数9=0.4444444444444444 cume_dist3=分区内小于等于80的人数为3/分区内总人数5=0.6
排序开窗函数
rank 确定一组值中一个值的排名。如果存在partition by ,则为每个分区组中的每个值排名。排名可能不是连续的。例如,如果两个行的排名为 1,则下一个排名为 3。 -- rank 开窗函数 select *, -- 对全部学生按数学分数排序 rank() over(order by math) as rank1, -- 对院系 按数学分数排序 rank() over(partition by departmentId order by math) as rank2, -- 对每个院系每个班级 按数学分数排序 rank() over(partition by departmentId,classId order by math) as rank3 from student_scores; 结果 id studentid language math english classid departmentid rank1 rank2 rank3 1 111 68 69 90 class1 department1 1 1 1 3 113 90 74 75 class1 department1 3 3 2 2 112 73 80 96 class1 department1 9 6 3 5 115 99 93 89 class1 department1 15 8 4 4 114 89 94 93 class1 department1 17 9 5 9 124 76 70 76 class2 department1 2 2 1 6 121 96 74 79 class2 department1 3 3 2 8 123 70 78 61 class2 department1 7 5 3 7 122 89 86 85 class2 department1 14 7 4 15 216 85 74 93 class1 department2 3 1 1 14 215 84 82 73 class1 department2 11 5 2 11 212 76 83 75 class1 department2 12 6 3 10 211 89 93 60 class1 department2 15 8 4 12 213 71 94 90 class1 department2 17 9 5 13 214 94 94 66 class1 department2 17 9 5 18 223 79 74 96 class2 department2 3 1 1 17 222 80 78 96 class2 department2 7 3 2 19 224 75 80 78 class2 department2 9 4 3 20 225 82 85 63 class2 department2 13 7 4 16 221 77 99 61 class2 department2 20 11 5
dense_rank dense_rank与rank有一点不同,当排名一样的时候,接下来的行是连续的。如两个行的排名为 1,则下一个排名为 2。 -- dense_rank 开窗函数 select *, -- 对全部学生按数学分数排序 dense_rank() over(order by math) as dense_rank1, -- 对院系 按数学分数排序 dense_rank() over(partition by departmentId order by math) as dense_rank2, -- 对每个院系每个班级 按数学分数排序 dense_rank() over(partition by departmentId,classId order by math) as dense_rank3 from student_scores; 结果: id studentid language math english classid departmentid dense_rank1 dense_rank2 dense_rank3 1 111 68 69 90 class1 department1 1 1 1 3 113 90 74 75 class1 department1 3 3 2 2 112 73 80 96 class1 department1 5 5 3 5 115 99 93 89 class1 department1 10 7 4 4 114 89 94 93 class1 department1 11 8 5 9 124 76 70 76 class2 department1 2 2 1 6 121 96 74 79 class2 department1 3 3 2 8 123 70 78 61 class2 department1 4 4 3 7 122 89 86 85 class2 department1 9 6 4 15 216 85 74 93 class1 department2 3 1 1 14 215 84 82 73 class1 department2 6 4 2 11 212 76 83 75 class1 department2 7 5 3 10 211 89 93 60 class1 department2 10 7 4 12 213 71 94 90 class1 department2 11 8 5 13 214 94 94 66 class1 department2 11 8 5 18 223 79 74 96 class2 department2 3 1 1 17 222 80 78 96 class2 department2 4 2 2 19 224 75 80 78 class2 department2 5 3 3 20 225 82 85 63 class2 department2 8 6 4 16 221 77 99 61 class2 department2 12 9 5
ntile 将分区中已排序的行划分为大小尽可能相等的指定数量的排名的组,并返回给定行所在的组的排名。 -- ntile 开窗函数 select *, -- 对分区内的数据分成两组 ntile(2) over(partition by departmentid order by math) as ntile1, -- 对分区内的数据分成三组 ntile(3) over(partition by departmentid order by math) as ntile2 from student_scores; 结果 id studentid language math english classid departmentid ntile1 ntile2 1 111 68 69 90 class1 department1 1 1 9 124 76 70 76 class2 department1 1 1 6 121 96 74 79 class2 department1 1 1 3 113 90 74 75 class1 department1 1 2 8 123 70 78 61 class2 department1 1 2 2 112 73 80 96 class1 department1 2 2 7 122 89 86 85 class2 department1 2 3 5 115 99 93 89 class1 department1 2 3 4 114 89 94 93 class1 department1 2 3 18 223 79 74 96 class2 department2 1 1 15 216 85 74 93 class1 department2 1 1 17 222 80 78 96 class2 department2 1 1 19 224 75 80 78 class2 department2 1 1 14 215 84 82 73 class1 department2 1 2 11 212 76 83 75 class1 department2 1 2 20 225 82 85 63 class2 department2 2 2 10 211 89 93 60 class1 department2 2 2 12 213 71 94 90 class1 department2 2 3 13 214 94 94 66 class1 department2 2 3 16 221 77 99 61 class2 department2 2 3 结果解释: 第8行 ntile1:对分区的数据均匀分成2组后,当前行的组排名为2 ntile2:对分区的数据均匀分成3组后,当前行的组排名为3
row_number 从1开始对分区内的数据排序。 - row_number 开窗函数 select studentid,departmentid,classid,math, -- 对分区departmentid,classid内的数据按math排序 row_number() over(partition by departmentid,classid order by math) as row_number from student_scores; 结果 studentid departmentid classid math row_number 111 department1 class1 69 1 113 department1 class1 74 2 112 department1 class1 80 3 115 department1 class1 93 4 114 department1 class1 94 5 124 department1 class2 70 1 121 department1 class2 74 2 123 department1 class2 78 3 122 department1 class2 86 4 216 department2 class1 74 1 215 department2 class1 82 2 212 department2 class1 83 3 211 department2 class1 93 4 213 department2 class1 94 5 214 department2 class1 94 6 223 department2 class2 74 1 222 department2 class2 78 2 224 department2 class2 80 3 225 department2 class2 85 4 221 department2 class2 99 5 结果解释: 同一分区,相同值,不同序。如studentid=213 studentid=214 值都为94 排序为5,6。
precent_rank 计算给定行的百分比排名。可以用来计算超过了百分之多少的人。如360小助手开机速度超过了百分之多少的人。 (当前行的rank值-1)/(分组内的总行数-1) -- percent_rank 开窗函数 select studentid,departmentid,classid,math, row_number() over(partition by departmentid,classid order by math) as row_number, percent_rank() over(partition by departmentid,classid order by math) as percent_rank from student_scores; 结果 studentid departmentid classid math row_number percent_rank 111 department1 class1 69 1 0.0 113 department1 class1 74 2 0.25 112 department1 class1 80 3 0.5 115 department1 class1 93 4 0.75 114 department1 class1 94 5 1.0 124 department1 class2 70 1 0.0 121 department1 class2 74 2 0.3333333333333333 123 department1 class2 78 3 0.6666666666666666 122 department1 class2 86 4 1.0 216 department2 class1 74 1 0.0 215 department2 class1 82 2 0.2 212 department2 class1 83 3 0.4 211 department2 class1 93 4 0.6 213 department2 class1 94 5 0.8 214 department2 class1 94 6 0.8 223 department2 class2 74 1 0.0 222 department2 class2 78 2 0.25 224 department2 class2 80 3 0.5 225 department2 class2 85 4 0.75 221 department2 class2 99 5 1.0 结果解释: studentid=115,percent_rank=(4-1)/(5-1)=0.75 studentid=123,percent_rank=(3-1)/(4-1)=0.6666666666666666
大数据
2018-09-12 17:55:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
1. 开启hive的远程连接 // 即使关闭当前会话但进程不会结束 nohup hive --service hiveserver2 --hiveconf hive.server2.thrift.port=10010 &
2. 导入jar包 链接: https://pan.baidu.com/s/1iS36dT7ilWS5qbSiV7raHw 密码:u6fd 添加到java项目中
3. 代码详解 code import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.List; public class HiveUtil { private Statement statement = null; // HiveUtil实例化时会自动打开连接并导入自定义方法udf public HiveUtil() { open(); Init(); } // 初始化udf自定义方法包(当连接断开时,方法会自动失效),若有新的方法可在后面新增执行语句 private void Init() { try { statement.execute("add jar /home/bigdata/udf.jar"); statement.execute("create temporary function sub as 'com.yulang.udf.SubString'"); statement.execute("create temporary function jsonParse as 'com.yulang.udf.JsonParse'"); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } static { try { // 1.加载驱动 Class.forName("org.apache.hive.jdbc.HiveDriver"); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void open() { try { // 2.打开连接 Connection connection = DriverManager.getConnection("jdbc:hive2://HADOOP01:10010/"); // 3.获得操作对象 - 会话 statement = connection.createStatement(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 创建数据库 - 用户注册时调用 * @param databaseName 根据用户标识生成的数据库名称 */ public void createDatabase(String databaseName) { try { statement.execute("create database " + databaseName); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 切换数据库 - 只对当前会话有效 * @param databaseName 目标数据库名称 */ public void changeDatabase(String databaseName) { try { statement.execute("use " + databaseName); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 获得当前数据库中的数据列表 - 注意切换数据库 * @return 数据表名称的集合 */ public List getTaleList() { List list = new ArrayList<>(); try { ResultSet rs = statement.executeQuery("show tables"); while (rs.next()) { list.add(rs.getString(1)); } } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } return list; } /** * 获得数据表的简要信息 * @param tableName 数据表名称 * @return 列名及列的数据类型 */ public List getTableInfo(String tableName){ List list = new ArrayList<>(); try { ResultSet rs = statement.executeQuery("desc " + tableName); while (rs.next()) { list.add(rs.getString(1) + "\t" + rs.getString(2)); } } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } return list; } /** * 获取数据表前十条的预览数据 * @param tableName 数据表名称 * @return 数据表预览数据 */ public List getTableData(String tableName){ List list = new ArrayList<>(); try { int size = getTableInfo(tableName).size(); ResultSet rs = statement.executeQuery("select * from " + tableName +" limit 10"); while (rs.next()) { String line = ""; for(int i = 1;i <= size;i ++) { line += rs.getString(i) + "\t"; } list.add(line); } } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } return list; } /** * 获得查询sql执行后的返回结果 * @param sql 用户自定义sql * @return sql执行结果集中的所有数据 */ public List getResultData(String sql) { List list = new ArrayList<>(); try { String tableName = "tmp_table"; sql = "create table " + tableName + " as " + sql; // 执行建表语句 statement.execute(sql); // 通过查询方法获取到新建表的数据 list = getTableData("tmp_table"); // 使用完临时表后drop临时表 statement.execute("drop table "+ tableName); } catch (SQLException e) { e.printStackTrace(); list = null; } return list; } }
大数据
2018-09-12 17:50:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
GROUPING SETS
GROUPING SETS作为GROUP BY的子句,允许开发人员在GROUP BY语句后面指定多个统计选项,可以简单理解为多条group by语句通过union all把查询结果聚合起来结合起来,下面是几个实例可以帮助我们了解,
以acorn_3g.test_xinyan_reg为例: [dp@YZSJHL19-87 xjob]$ hive -e "use acorn_3g;desc test_xinyan_reg;" user_id bigint None device_id int None 手机,平板 os_id int None 操作系统类型 app_id int None 手机app_id client_version string None 客户端版本 from_id int None 四级渠道
grouping sets语句 等价hive语句
select device_id,os_id,app_id,count(user_id) from test_xinyan_reg group by device_id,os_id,app_id grouping sets((device_id)) SELECT device_id,null,null,count(user_id) FROM test_xinyan_reg group by device_id
select device_id,os_id,app_id,count(user_id) from test_xinyan_reg group by device_id,os_id,app_id grouping sets((device_id,os_id)) SELECT device_id,os_id,null,count(user_id) FROM test_xinyan_reg group by device_id,os_id
select device_id,os_id,app_id,count(user_id) from test_xinyan_reg group by device_id,os_id,app_id grouping sets((device_id,os_id),(device_id))
select device_id,os_id,app_id,count(user_id) from test_xinyan_reg group by device_id,os_id,app_id grouping sets((device_id),(os_id),(device_id,os_id),())
SELECT device_id,os_id,null,count(user_id) FROM test_xinyan_reg group by device_id,os_id
UNION ALL
SELECT device_id,null,null,count(user_id) FROM test_xinyan_reg group by device_id
SELECT device_id,null,null,count(user_id) FROM test_xinyan_reg group by device_id
UNION ALL
SELECT null,os_id,null,count(user_id) FROM test_xinyan_reg group by os_id
UNION ALL
SELECT device_id,os_id,null,count(user_id) FROM test_xinyan_reg group by device_id,os_id
UNION ALL
SELECT null,null,null,count(user_id) FROM test_xinyan_reg

CUBE

cube简称数据魔方,可以实现hive多个任意维度的查询,cube(a,b,c)则首先会对(a,b,c)进行group by,然后依次是(a,b),(a,c),(a),(b,c),(b),(c),最后在对全表进行group by,他会统计所选列中值的所有组合的聚合 select device_id,os_id,app_id,client_version,from_id,count(user_id) from test_xinyan_reg group by device_id,os_id,app_id,client_version,from_id with cube;
等价于以下sql SELECT device_id,null,null,null,null ,count(user_id) FROM test_xinyan_reg group by device_id UNION ALL SELECT null,os_id,null,null,null ,count(user_id) FROM test_xinyan_reg group by os_id UNION ALL SELECT device_id,os_id,null,null,null ,count(user_id) FROM test_xinyan_reg group by device_id,os_id UNION ALL SELECT null,null,app_id,null,null ,count(user_id) FROM test_xinyan_reg group by app_id UNION ALL SELECT device_id,null,app_id,null,null ,count(user_id) FROM test_xinyan_reg group by device_id,app_id UNION ALL SELECT null,os_id,app_id,null,null ,count(user_id) FROM test_xinyan_reg group by os_id,app_id UNION ALL SELECT device_id,os_id,app_id,null,null ,count(user_id) FROM test_xinyan_reg group by device_id,os_id,app_id UNION ALL SELECT null,null,null,client_version,null ,count(user_id) FROM test_xinyan_reg group by client_version UNION ALL SELECT device_id,null,null,client_version,null ,count(user_id) FROM test_xinyan_reg group by device_id,client_version UNION ALL SELECT null,os_id,null,client_version,null ,count(user_id) FROM test_xinyan_reg group by os_id,client_version UNION ALL SELECT device_id,os_id,null,client_version,null ,count(user_id) FROM test_xinyan_reg group by device_id,os_id,client_version UNION ALL SELECT null,null,app_id,client_version,null ,count(user_id) FROM test_xinyan_reg group by app_id,client_version UNION ALL SELECT device_id,null,app_id,client_version,null ,count(user_id) FROM test_xinyan_reg group by device_id,app_id,client_version UNION ALL SELECT null,os_id,app_id,client_version,null ,count(user_id) FROM test_xinyan_reg group by os_id,app_id,client_version UNION ALL SELECT device_id,os_id,app_id,client_version,null ,count(user_id) FROM test_xinyan_reg group by device_id,os_id,app_id,client_version UNION ALL SELECT null,null,null,null,from_id ,count(user_id) FROM test_xinyan_reg group by from_id UNION ALL SELECT device_id,null,null,null,from_id ,count(user_id) FROM test_xinyan_reg group by device_id,from_id UNION ALL SELECT null,os_id,null,null,from_id ,count(user_id) FROM test_xinyan_reg group by os_id,from_id UNION ALL SELECT device_id,os_id,null,null,from_id ,count(user_id) FROM test_xinyan_reg group by device_id,os_id,from_id UNION ALL SELECT null,null,app_id,null,from_id ,count(user_id) FROM test_xinyan_reg group by app_id,from_id UNION ALL SELECT device_id,null,app_id,null,from_id ,count(user_id) FROM test_xinyan_reg group by device_id,app_id,from_id UNION ALL SELECT null,os_id,app_id,null,from_id ,count(user_id) FROM test_xinyan_reg group by os_id,app_id,from_id UNION ALL SELECT device_id,os_id,app_id,null,from_id ,count(user_id) FROM test_xinyan_reg group by device_id,os_id,app_id,from_id UNION ALL SELECT null,null,null,client_version,from_id ,count(user_id) FROM test_xinyan_reg group by client_version,from_id UNION ALL SELECT device_id,null,null,client_version,from_id ,count(user_id) FROM test_xinyan_reg group by device_id,client_version,from_id UNION ALL SELECT null,os_id,null,client_version,from_id ,count(user_id) FROM test_xinyan_reg group by os_id,client_version,from_id UNION ALL SELECT device_id,os_id,null,client_version,from_id ,count(user_id) FROM test_xinyan_reg group by device_id,os_id,client_version,from_id UNION ALL SELECT null,null,app_id,client_version,from_id ,count(user_id) FROM test_xinyan_reg group by app_id,client_version,from_id UNION ALL SELECT device_id,null,app_id,client_version,from_id ,count(user_id) FROM test_xinyan_reg group by device_id,app_id,client_version,from_id UNION ALL SELECT null,os_id,app_id,client_version,from_id ,count(user_id) FROM test_xinyan_reg group by os_id,app_id,client_version,from_id UNION ALL SELECT device_id,os_id,app_id,client_version,from_id ,count(user_id) FROM test_xinyan_reg group by device_id,os_id,app_id,client_version,from_id UNION ALL SELECT null,null,null,null,null ,count(user_id) FROM test_xinyan_reg
ROLL UP
rollup可以实现从右到做递减多级的统计,显示统计某一层次结构的聚合。 select device_id,os_id,app_id,client_version,from_id,count(user_id) from test_xinyan_reg group by device_id,os_id,app_id,client_version,from_id with rollup;
等价于以下sql select device_id,os_id,app_id,client_version,from_id,count(user_id) from test_xinyan_reg group by device_id,os_id,app_id,client_version,from_id grouping sets ((device_id,os_id,app_id,client_version,from_id),(device_id,os_id,app_id,client_version),(device_id,os_id,app_id),(device_id,os_id),(device_id),());
Grouping_ID
当我们没有统计某一列时,它的值显示为null,这可能与列本身就有null值冲突,这就需要一种方法区分是没有统计还是值本来就是null。(grouping_id其实就是所统计各列二进制和)
Column1 (key) Column2 (value)
1 NULL
1 1
2 2
3 3
3
4
NULL
5
hsql: SELECT key, value, GROUPING__ID, count(*) from T1 GROUP BY key, value WITH ROLLUP
结果:
NULL NULL 0 00 6
1 NULL 1 10 2
1 NULL 3 11 1
1 1 3 11 1
2 NULL 1 10 1
2 2 3 11 1
3 NULL 1 10 2
3 NULL 3 11 1
3
4 4
3
NULL 5
3 11
1 10 3 11
1
1 1
GROUPING__ID转变为二进制,如果对应位上有值为null,说明这列本身值就是null。
大数据
2018-09-12 17:20:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
假设有下面的代码: class A(object): def p(self): print("A") class B(A): def p(self): print("B") super().p() class C(B): def p(self): print("C") super().p() class D(C): def p(self): print("D") super().p() class E(D): def p(self): print("E") super().p() c = E() c.p()
正常应该显示: E D C B A
但实际跳过了几个类,显示: E D A
经过排查发现在 D 中使用了下面的语句: super(B, self).p()
指定了父类,从而直接跳过了 C 和 B.
当然实际代码比这个要复杂,所以在一定程度上隐藏了问题.
大数据
2018-09-12 15:30:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
修改zeppelin-site.xml 的 ssl.port 端口 zeppelin.server.ssl.port 8091 Server ssl port. (used when ssl property is set to true)
大数据
2018-09-12 11:41:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
1. SQL操作 查看数据库 show databases; 创建数据库 create database {database}; 删除数据库 drop database [IF EXISTS] {database} [CASCADE]; 切换数据库 use {database}; 查看当前库的所有库 show tables [like '*'] [in database]; 创建表 create database.table(col_name datatype,...) 增加字段 ALTER TABLE tableName ADD COLUMNS (new_col data_type); 查看表结构 desc table; 插入数据 insert into table(column,...) values(data,...); 重命名表名 ALTER TABLE tableName RENAME TO newName; 删除表 drop table if exists tableName;
2. 导入数据 创建表时指定分割符: ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t’ STORED AS TEXTFILE; 导入数据跳过首行/尾行: create table test( c1 int )row format delimited fields terminated by '\t' location '{HDFS_path}' tblproperties ("skip.header.line.count"="1", "skip.footer.line.count"="2") // 以\t为字段分隔符并在HDFS中创建外部表,跳过首行,跳过尾两行 从本地文件导入(相当于复制操作copy) load data local inpath 'filePath' into table tableName; 从HDFS导入(相当于移动操作move,源文件将不存在) load data inpath 'hdfsPath' into table tableName;
大数据
2018-09-12 11:13:00
「深度学习福利」大神带你进阶工程师,立即查看>>> hadoop2.7.2 链接: https://pan.baidu.com/s/1Mw2It3kvEbEIHyOYDcCZDQ 密码:i6sx tomcat8.5.31 链接: https://pan.baidu.com/s/12NyfC7MV26W2iaMRdrNJ1Q 密码:nxld jdk8u171 链接: https://pan.baidu.com/s/1tVg11y8UoUyYW33g5uAe-g 密码:t3uj 源数据结构 电话号码 上行流量 下行流量 输出文件数据结构 电话号码 上行流量 下行流量 总流量
Flow(实体类) import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; // 实现序列化接口 // 数据写入需要调用Writable接口 public class Flow implements Writable { // 如果出现其他构造方法,无参构造方法必须显示声明 public Flow() { } public Flow(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; } private long upFlow; private long downFlow; public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } // 输出格式改成最后需要显示的效果(可以减少后面的代码量) @Override public String toString() { // 在生成最后的结果文件时,可以使用该方法定义数据输出格式 return upFlow + "\t" + downFlow + "\t" + (upFlow + downFlow); } // 这个传参顺序需要与write方法中的顺序对应 @Override public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); } // 将参数存入实例中(需注意参数类型) @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); } }
map import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import com.sand.mr.bean.Flow; // LongWritable:偏移量 public class FlowCountMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { String line = value.toString(); //根据数据格式进行拆分 String phoneNumber = line.split(",")[0]; String upFlow = line.split(",")[1]; String downFlow = line.split(",")[2]; //通过实例化flow对象存数据 Flow flow = new Flow(Long.parseLong(upFlow), Long.parseLong(downFlow)); // 数据写入 context.write(new Text(phoneNumber), flow); } }
reduce import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import com.sand.mr.bean.Flow; public class FlowCountReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException { long upFlow = 0; long downFlow = 0; //通过加强for循环统计每个key值的上/下行流量以及总和(总和的计算方法已写入在Flow类中的tostring方法里) for (Flow flow : values) { upFlow += flow.getUpFlow(); downFlow += flow.getDownFlow(); } Flow flow = new Flow(upFlow, downFlow); context.write(key, flow); } }
master import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.sand.mr.bean.Flow; import com.sand.mr.mapper.FlowCountMapper; import com.sand.mr.reducer.FlowCountReducer; public class FlowCountMaster { public static void main(String[] args) throws Exception { // 接收参数 String inputPath = args[0]; String outputPut = args[1]; // 初始化配置 Configuration conf = new Configuration(); // 设置fs.defaultFS参数 conf.set("fs.defaultFS", "hdfs://HADOOP01:8020/"); // 初始化job参数,指定job名称 Job job = Job.getInstance(conf, "flowCount"); // 设置运行job的类 job.setJarByClass(FlowCountMaster.class); // 设置Mapper类 job.setMapperClass(FlowCountMapper.class); // 设置Reducer类 job.setReducerClass(FlowCountReducer.class); // 设置Map的输出数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Flow.class); // 设置Reducer的输出数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Flow.class); // 设置输入的路径 -> 可以指定为某一路径或具体的文件 FileInputFormat.setInputPaths(job, new Path(inputPath)); // 设置输出的路径 -> 最后一级路径自动生成,不会自动覆盖,需要手动修改 FileOutputFormat.setOutputPath(job, new Path(outputPut)); // 提交job boolean result = job.waitForCompletion(true); // 执行成功后进行后续操作 if (result) { System.out.println("Congratulations!"); } } }
大数据
2018-09-11 20:17:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
原来链接: 深入学习Redis(4):哨兵
哨兵的功能: 监控(Monitoring):哨兵会不断地检查主节点和从节点是否运作正常。 自动故障转移(Automatic failover):当主节点不能正常工作时,哨兵会开始自动故障转移操作,它会将失效主节点的其中一个从节点升级为新的主节点,并让其他从节点改为复制新的主节点。 配置提供者(Configuration provider):客户端在初始化时,通过连接哨兵来获得当前Redis服务的主节点地址。 通知(Notification):哨兵可以将故障转移的结果发送给客户端。
典型的哨兵架构图如下所示:
它由两部分组成,哨兵节点和数据节点: 哨兵节点:哨兵系统由一个或多个哨兵节点组成,哨兵节点是 特殊的redis节点 ,不存储数据。 数据节点:主节点和从节点都是数据节点。
部署:
哨兵系统:包含1个主节点、2个从节点和3个哨兵节点
主从节点配置: 主节点: port 6379 daemonize yes logfile "6379.log" dbfilename "dump-6379.rdb" 从节点1: port 6380 daemonize yes logfile "6380.log" dbfilename "dump-6380.rdb" slaveof 192.168.92.128 6379 从节点2: port 6381 daemonize yes logfile "6381.log" dbfilename "dump-6381.rdb" slaveof 192.168.92.128 6379
启动主从节点命令相同: redis-server redis-6379.conf redis-server redis-6380.conf redis-server redis-6381.conf
哨兵节点配置: port 26379 daemonize yes logfile "26379.log" sentinel monitor mymaster 192.168.92.128 6379 2
其中,sentinel monitor mymaster 192.168.92.128 6379 2 配置的含义是:该哨兵节点监控192.168.92.128:6379这个主节点,该主节点的名称是mymaster,最后的2的含义与主节点的故障判定有关:至少需要2个哨兵节点同意,才能判定主节点故障并进行故障转移。
启动哨兵节点两种方式: redis-sentinel sentinel-26379.conf redis-server sentinel-26379.conf --sentinel
哨兵系统的搭建过程,有几点需要注意: 哨兵系统中的主从节点,与普通的主从节点并没有什么区别,故障发现和转移是由哨兵来控制和完成的。 哨兵节点本质上是redis节点。 每个哨兵节点,只需要配置监控主节点,便可以自动发现其他的哨兵节点和从节点。 在哨兵节点启动和故障转移阶段,各个节点的配置文件会被重写(config rewrite)。 一个哨兵可以监控多个主节点,通过配置多条sentinel monitor即可实现。
大数据
2018-09-11 17:07:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
1. 功能介绍
同时在线打开两个版本的Word文档,切换显示其中的一个文档,或同时显示两个文档对比文档内容,实现在线的文档内容比较功能。
2. 如何实现word文件比较功能
Word软件本身提供两种了两个文件比较的功能。 打开Word文档后,单击功能区中的“审阅”标签,然后点击“比较”选项组中的“比较”按钮,进入“比较文档”窗口后,选择所要比较的“原文档”和“修订的文档”,将各项需要比较的数据设置好,按“确定”按钮,即可看两个文档的对比效果。 打开Word文档后,单击功能区中的“审阅”标签,然后点击“比较”选项组中的“合并”按钮,进入“合并文档”窗口后,选择所要比较的“原文档”和“修订的文档”,将各项需要比较的数据设置好,按“确定”按钮。完成后,即可看到修订的具体内容,文档中的痕迹就是两个文件的区别。
就两种文档对比的效果来说,第1种方式的效果更好一些,但是微软没有提供实现此功能的相应接口,无法二次开发实现同样的在线文档比较效果,所以PageOffice的在线word文件比较功能采用的是第2种方式。
(显示A文档)
(显示B文档)
(显示A与B两个文档的比较结果) 详细请参考PageOffice开发包中Samples4示例:
二、35、演示比较两个版本的Word文档的功能 (企业版)
大数据
2018-09-11 15:51:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
查看当前编码格式 locale LANG=en_US.utf8 LC_CTYPE="en_US.utf8" LC_NUMERIC="en_US.utf8" LC_TIME="en_US.utf8" LC_COLLATE="en_US.utf8" LC_MONETARY="en_US.utf8" LC_MESSAGES="en_US.utf8" LC_PAPER="en_US.utf8" LC_NAME="en_US.utf8" LC_ADDRESS="en_US.utf8" LC_TELEPHONE="en_US.utf8" LC_MEASUREMENT="en_US.utf8" LC_IDENTIFICATION="en_US.utf8" LC_ALL=
我这个是修改后的,编码格式为en_US.utf8支持中文
查看当前容器的语言环境 locale -a C POSIX
添加语言
如果当前语言环境中有en_US.utf8
我们直接添加就好 localedef -v -c -i en_US -f UTF-8 en_US.UTF-8
如果没有则需要安装(未验证) yum groupinstall chinese-support
修改配置文件
在/etc/profile中添加 LANG=en_US.UTF-8
然后 source /etc/profile
查看 C en_US.utf8 POSIX
语言环境里已经有了en_US.utf8
大数据
2018-09-11 15:12:00
「深度学习福利」大神带你进阶工程师,立即查看>>>

2018年还有1/4,3个月,接下来的目标 [英语,软考,跑步,代码技能,个人包装,代码量,python]
1. 每天打卡百词斩
2. 每周跑步15公里,每月60公里
3. 看完netty视频课程
4. 看完spring视频课程
5. 翻译Effective java条款,1个星期翻译1个条款
6. 软件设计师考试(2018年11月,2019年5月)
7. 写博客6篇,平均2个星期1篇
大数据
2018-09-11 15:05:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
原文连接: 深入学习Redis(3):主从复制
默认情况下,每台Redis服务器都是主节点;且一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点。
主从复制的作用主要包括: 数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。 故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的冗余。 负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点),分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。 高可用基石:除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis高可用的基础。
主从复制的开启,完全是在从节点发起的;不需要我们在主节点做任何事情。
主从复制过程大体可以分为3个阶段:连接建立阶段(即准备阶段)、数据同步阶段、命令传播阶段;
1、 连接建立阶段
步骤1:保存主节点信息: masterhost和masterport。slaveof是异步命令,从节点完成主节点ip和port的保存后,向发送slaveof命令的客户端直接返回OK,实际的复制操作在这之后才开始进行。
步骤2:建立socket连接:
从节点每秒1次调用复制定时函数replicationCron(),如果发现了有主节点可以连接,便会根据主节点的ip和port,创建socket连接。如果连接成功,则:
从节点:为该socket建立一个专门处理复制工作的文件事件处理器,负责后续的复制工作,如接收RDB文件、接收命令传播等。
主节点:接收到从节点的socket连接后(即accept之后),为该socket创建相应的客户端状态, 并将从节点看做是连接到主节点的一个客户端,后面的步骤会以从节点向主节点发送命令请求的形式来进行。
步骤3:发送ping命令
从节点成为主节点的客户端之后,发送ping命令进行首次请求,目的是:检查socket连接是否可用,以及主节点当前是否能够处理请求。
从节点发送ping命令后,可能出现3种情况: 返回pong:说明socket连接正常,且主节点当前可以处理请求,复制过程继续。 超时:一定时间后从节点仍未收到主节点的回复,说明socket连接不可用,则从节点断开socket连接,并重连。 返回pong以外的结果:如果主节点返回其他结果,如正在处理超时运行的脚本,说明主节点当前无法处理命令,则从节点断开socket连接,并重连。
步骤4:身份验证
如果从节点中设置了masterauth选项,则从节点需要向主节点进行身份验证;没有设置该选项,则不需要验证。从节点进行身份验证是通过向主节点发送auth命令进行的,auth命令的参数即为配置文件中的masterauth的值。
如果主节点设置密码的状态,与从节点masterauth的状态一致(一致是指都存在,且密码相同,或者都不存在),则身份验证通过,复制过程继续;如果不一致,则从节点断开socket连接,并重连。
步骤5:发送从节点端口信息
身份验证之后,从节点会向主节点发送其监听的端口号(前述例子中为6380),主节点将该信息保存到该从节点对应的客户端的slave_listening_port字段中;该端口信息除了在主节点中执行info Replication时显示以外,没有其他作用。
2、数据同步阶段
主从节点之间的连接建立以后,便可以开始进行数据同步,该阶段可以理解为从节点数据的初始化。具体执行的方式是:从节点向主节点发送psync命令(Redis2.8以前是sync命令),开始同步。
数据同步阶段是主从复制最核心的阶段,根据主从节点当前状态的不同,可以分为全量复制和部分复制。
需要注意的是,在数据同步阶段之前, 从节点是主节点的客户端 ,主节点不是从节点的客户端;而到了这一阶段及以后, 主从节点互为客户端 。原因在于:在此之前,主节点只需要响应从节点的请求即可,不需要主动发请求,而在数据同步阶段和后面的命令传播阶段,主节点需要主动向从节点发送请求(如推送缓冲区中的写命令),才能完成复制。
3. 命令传播阶段
数据同步阶段完成后,主从节点进入命令传播阶段;在这个阶段主节点将自己执行的写命令发送给从节点,从节点接收命令并执行,从而保证主从节点数据的一致性。
在命令传播阶段,除了发送写命令,主从节点还维持着心跳机制:PING和REPLCONF ACK。
需要注意的是,命令传播是异步的过程,即主节点发送写命令后并不会等待从节点的回复;因此实际上主从节点之间很难保持实时的一致性,延迟在所难免。
【数据同步阶段】全量复制和部分复制 全量复制:用于初次复制或其他无法进行部分复制的情况,将主节点中的所有数据都发送给从节点,是一个非常重型的操作。 部分复制:用于网络中断等情况后的复制,只将中断期间主节点执行的写命令发送给从节点,与全量复制相比更加高效。需要注意的是,如果网络中断时间过长,导致主节点没有能够完整地保存中断期间执行的写命令,则无法进行部分复制,仍使用全量复制。
全量复制的过程 从节点判断无法进行部分复制,向主节点发送全量复制的请求;或从节点发送部分复制的请求,但主节点判断无法进行全量复制;具体判断过程需要在讲述了部分复制原理后再介绍。 主节点收到全量复制的命令后,执行bgsave,在后台生成RDB文件, 并使用一个缓冲区(称为复制缓冲区) 记录从现在开始执行的所有写命令 主节点的bgsave执行完成后,将RDB文件发送给从节点; 从节点首先清除自己的旧数据,然后载入接收的 RDB 文件 ,将数据库状态更新至主节点执行bgsave时的数据库状态 主节点将前述复制缓冲区中的所有写命令发送给从节点,从节点执行这些写命令,将数据库状态更新至主节点的最新状态 如果从节点开启了AOF,则会触发bgrewriteaof的执行,从而保证AOF文件更新至主节点的最新状态
通过全量复制的过程可以看出,全量复制是非常重型的操作: 主节点通过bgsave命令fork子进程进行RDB持久化,该过程是非常消耗CPU、内存(页表复制)、硬盘IO的; 主节点通过网络将RDB文件发送给从节点,对主从节点的带宽都会带来很大的消耗 从节点清空老数据、载入新RDB文件的过程是阻塞的,无法响应客户端的命令;如果从节点执行bgrewriteaof,也会带来额外的消耗
部分复制,三个概念:
(1)复制偏移量
主节点和从节点分别维护一个复制偏移量(offset),代表的是 主节点向从节点传递的字节数 ;主节点每次向从节点传播N个字节数据时,主节点的offset增加N;从节点每次收到主节点传来的N个字节数据时,从节点的offset增加N。
offset用于判断主从节点的数据库状态是否一致:如果二者offset相同,则一致;如果offset不同,则不一致,此时可以根据两个offset找出从节点缺少的那部分数据。例如,如果主节点的offset是1000,而从节点的offset是500,那么部分复制就需要将offset为501-1000的数据传递给从节点。而offset为501-1000的数据存储的位置,就是下面要介绍的复制积压缓冲区。
(2)复制积压缓冲区
复制积压缓冲区是由主节点维护的、固定长度的、先进先出(FIFO)队列,默认大小1MB;当主节点开始有从节点时创建,其作用是备份主节点最近发送给从节点的数据。注意,无论主节点有一个还是多个从节点,都只需要一个复制积压缓冲区。
在命令传播阶段,主节点除了将写命令发送给从节点,还会发送一份给复制积压缓冲区,作为写命令的备份;除了存储写命令,复制积压缓冲区中还存储了其中的每个字节对应的复制偏移量(offset)。由于复制积压缓冲区定长且是先进先出,所以它保存的是主节点最近执行的写命令;时间较早的写命令会被挤出缓冲区。
由于该缓冲区长度固定且有限,因此可以备份的写命令也有限,当主从节点offset的差距过大超过缓冲区长度时,将无法执行部分复制,只能执行全量复制。反过来说,为了提高网络中断时部分复制执行的概率,可以根据需要增大复制积压缓冲区的大小(通过配置repl-backlog-size);例如如果网络中断的平均时间是60s,而主节点平均每秒产生的写命令(特定协议格式)所占的字节数为100KB,则复制积压缓冲区的平均需求为6MB,保险起见,可以设置为12MB,来保证绝大多数断线情况都可以使用部分复制。
从节点将offset发送给主节点后,主节点根据offset和缓冲区大小决定能否执行部分复制: 如果offset偏移量之后的数据,仍然都在复制积压缓冲区里,则执行部分复制; 如果offset偏移量之后的数据已不在复制积压缓冲区中(数据已被挤出),则执行全量复制。
(3)服务器运行ID(runid)
每个Redis节点(无论主从),在启动时都会自动生成一个随机ID(每次启动都不一样),由40个随机的十六进制字符组成;runid用来唯一识别一个Redis节点。通过info Server命令,可以查看节点的runid:
主从节点初次复制时,主节点将自己的runid发送给从节点,从节点将这个runid保存起来;当断线重连时,从节点会将这个runid发送给主节点;主节点根据runid判断能否进行部分复制: 如果从节点保存的runid与主节点现在的runid相同,说明主从节点之前同步过,主节点会继续尝试使用部分复制(到底能不能部分复制还要看offset和复制积压缓冲区的情况); 如果从节点保存的runid与主节点现在的runid不同,说明从节点在断线前同步的Redis节点并不是当前的主节点,只能进行全量复制。
redis读写分离,可以实现redis的读负载均衡。
主从复制延迟与不一致问题的优化措施:优化主从节点之间的网络环境(如在同机房部署);监控主从节点延迟(通过offset)判断,如果从节点延迟过大,通知应用不再通过该从节点读取数据;使用集群同时扩展写负载和读负载等。
复制中断问题
主从节点超时是复制中断的原因之一,除此之外,还有其他情况可能导致复制中断,其中最主要的是复制缓冲区溢出问题。
复制缓冲区溢出
前面曾提到过,在全量复制阶段,主节点会将执行的写命令放到复制缓冲区中,该缓冲区存放的数据包括了以下几个时间段内主节点执行的写命令:bgsave生成RDB文件、RDB文件由主节点发往从节点、从节点清空老数据并载入RDB文件中的数据。当主节点数据量较大,或者主从节点之间网络延迟较大时,可能导致该缓冲区的大小超过了限制,此时主节点会断开与从节点之间的连接;这种情况可能引起全量复制->复制缓冲区溢出导致连接中断->重连->全量复制->复制缓冲区溢出导致连接中断……的循环。解决办法:增大复制缓冲区
大数据
2018-09-11 11:12:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
perceptron:
激活函数 感知机模型:f(x)=sign(w*x+b) import numpy as np class Perceptron: """ This class models an artificial neuron with step activation function. """ def __init__(self, weights = np.array([1]), threshold = 0): self.weights = weights.astype(float) self.threshold = threshold def activate(self, values): strength = np.dot(values,self.weights) return int(strength > self.threshold) def update(self, values, train, eta=.1): for data_point in xrange(len(values)): prediction = self.activate(values[data_point]) error = train[data_point] - prediction weight_update = values[data_point]*error*eta# TODO self.weights += weight_update def test(): def sum_almost_equal(array1, array2, tol = 1e-6): return sum(abs(array1 - array2)) < tol p1 = Perceptron(np.array([1,1,1]),0) p1.update(np.array([[2,0,-3]]), np.array([1])) assert sum_almost_equal(p1.weights, np.array([1.2, 1, 0.7])) p2 = Perceptron(np.array([1,2,3]),0) p2.update(np.array([[3,2,1],[4,0,-1]]),np.array([0,0])) assert sum_almost_equal(p2.weights, np.array([0.7, 1.8, 2.9])) p3 = Perceptron(np.array([3,0,2]),0) p3.update(np.array([[2,-2,4],[-1,-3,2],[0,2,1]]),np.array([0,1,0])) assert sum_almost_equal(p3.weights, np.array([2.7, -0.3, 1.7])) if __name__ == "__main__": test()
sigmoid:
激活函数:sigmoid:1/(1+exp(x))
感知机与logistic regression的差别就是感知机激活函数是sign,logistic regression的激活函数是sigmoid 逻辑回归模型:f(x)= sigmoid(w*x+b) import numpy as np class Sigmoid: def __init__(self, weights = np.array([1])): self.weights = weights self.last_input = 0 # strength of last input self.delta = 0 # error signal def activate(self, values): strength = np.dot(values, self.weights) self.last_input = strength result = 1/(1+np.exp(-self.last_input)) return result def update(self, values, train, eta=.1): for X, y_true in zip(values, train): y_pred = self.activate(X) error = y_true - y_pred dx = 1/(1+np.exp(-self.last_input))*(1-1/(1+np.exp(-self.last_input))) dw = eta*error*dx*X self.weights += dw def test(): def sum_almost_equal(array1, array2, tol = 1e-5): return sum(abs(array1 - array2)) < tol u1 = Sigmoid(weights=[3,-2,1]) assert abs(u1.activate(np.array([1,2,3])) - 0.880797) < 1e-5 u1.update(np.array([[1,2,3]]),np.array([0])) assert sum_almost_equal(u1.weights, np.array([2.990752, -2.018496, 0.972257])) u2 = Sigmoid(weights=[0,3,-1]) u2.update(np.array([[-3,-1,2],[2,1,2]]),np.array([1,0])) assert sum_almost_equal(u2.weights, np.array([-0.030739, 2.984961, -1.027437])) if __name__ == "__main__": test()
大数据
2018-09-04 10:39:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
直接上例子 假设班级里有男生60个,女生40个,,,现在检索系统要找出所有女生。假设系统找出了50个人认为是女生,其中实际上男生20个人,女生30个。 准确率(Accuracy):为系统将男生女生分类正确的统计 A = [30 +(60-20)]/100 精确率(Precision)也叫查准率:为检索出正确的个数 / 检索出的总个数 P = 30 /50 召回率(recall)也叫查全率:为检索出正确的个数 / 应该检索出的总个数 R = 30 / 40 F1-measure:为精确值和召回率的调和均值,也就是 F1 =2/( 50/30 + 40/30)=2/3
注意:精确率、召回率、F1都是针对某一类而计算的。
**ps:**​ P和R指标有的时候是矛盾的,那么有没有办法综合考虑他们呢?想方法肯定是有很多的,最常见的方法应该就是F1-Measure了,有些地方也叫做F-Score,其实都是一样的。F-Measure是Precision和Recall加权调和平均
大数据
2018-09-04 10:29:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
数据分析基础——基本运算
x=[n*m]型矩阵数据,n是佯本数,m是变量维数;
*********************************基本公式************************************
均值:每一列的均值 mean(xj)=(xj1+xj2+...+xjn)/n
方差:计算每一列的方差 var(xj)=[(xj1-mean(xj))^2+(xj2-mean(xj))^2+...+(xjn-mean(xj))^2]/n
标准差:方差的开方 std=var(xj)^1/2
协方差:变量xj和xk之间协方差 cov(xj, xk)=[(xj1-mean(xj))*(xk1-mean(xk))+...+(xjn-mean(xj))*(xkn-mean(xk))]/n
相关系数:变量xj和xk之间的相关系数 ,反映两个变量的相似程度0~1; r(xj , xk)=cov(xj , xk)/[std(xj)*std(xk)]
向量内积:变量x和y之间的内积 (x1y1+x2y2+...+xmym)
向量x,y之间的夹角:内积/(模x*模y) (x1y1+x2y2+...+xmym)/ [(x1^2+x2^2+...+xm^2)^1/2 * (y1^2+y2^2+...+ym^2)^1/2]
内积和夹角之间的关系
***********************************基本处理********************************
数据中心化:均值为0,中心在原点 Xij=xij-mean(xj)
数据无量纲化:各种无量纲方法,对数据压缩 Xij=xij/std(xj)
Xij=xij/max(xj)
Xij=xij/min(xj)
Xij=xij/mean(xj)
Xij=xij/[max(xj)-min(xj)]
数据归一化:其实和上面无量纲一个意思,对数据中心化和压缩 Xij=[xij-min(xj)]/[max(xj)-min(xj)]
数据标准化:标准化是对数据每列佯本数进行标准化,均值为0,方差为1;标准化的目的是对数据进行中心化和压缩 Xij=(xij-mean(xj))/std(xj)
变量归一化(单位圆化):对每个佯本进行归一化(变量间归一化),中心在原点,距离为1;注意这里是对每一行进行归一化,每个佯本的模为1; Xij=xij/(xi1^2+xi2^2+...+xim^2)^1/2
说明 :因该针对不同的问题,采用不同数据处理方法,不能盲目用以上方法,比如:归一化是对列还是对行进行处理,需要看数据针对的问题。
心得: 单位圆化后的两个向量的夹角=其内积
补充: 测定系数R^2(多元回归中叫复测定系数),对回归方程的一个评价指标。(参考偏最小二乘回归的线性与非线性方法(书.王惠文))




大数据
2018-09-04 10:23:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
产品概述:
推荐引擎(Recommendation Engine,以下简称RecEng,特指阿里云推荐引擎)是在阿里云计算环境下建立的一套推荐服务框架,目标是让广大中小互联网企业能够在这套框架上快速的搭建满足自身业务需求的推荐服务。
课程链接: 阿里云推荐引擎使用教程
推荐服务通常由三部分组成:日志采集,推荐计算和产品对接。推荐服务首先需要采集产品中记录的用户行为日志到离线存储,然后在离线环境下利用推荐算法进行用户和物品的匹配计算,找出每个用户可能感兴趣的物品集合后,将这些预先计算好的结果推送到在线存储上,最终产品在有用户访问时通过在线API向推荐服务发起请求,获得该用户可能感兴趣的物品,完成推荐业务。
RecEng的核心是推荐算法的定制。RecEng为推荐业务定义了一套完整的规范,从输入,到计算,到输出,客户可以在这个框架下自定义算法和规则,以此满足各种行业的需求,包括电商,音乐,视频,社交,新闻,阅读等。同时,RecEng也提供了相应的方法供客户便捷的接入用户访问日志,以及自定义满足其自身业务需求的在线API。
基本概念:
客户/租户(org/tenant)
指RecEng的使用者,系统中由其阿里云账号代表。通常客户是一个组织,RecEng中常用org表示客户。
用户(user)
指客户的用户,即RecEng使用者的用户。推荐是一个2C的服务,使用推荐服务的客户必然有其自己的用户,RecEng使用者的用户简称为“用户”,系统中常用user表示用户。
物品(item)
指被推荐给用户的内容,可以是商品,也可以是歌曲,视频等其他内容,系统中常用item表示物品。
业务(biz)
业务针对数据集定义,定义了算法所能使用的数据范围。一个客户在RecEng上可以有多个业务,不同的业务必然有不同的数据集。RecEng要求每个业务提供四类数据(不要求全部提供):用户数据,物品数据,用户行为数据,推荐效果数据。每一组这样的数据就构成一个业务。系统中常用biz表示业务。
比如某客户A有两类被推荐的物品,分别是视频和歌曲,于是客户A可以在RecEng上建立两个业务M和N,其中M的物品数据为视频,N的物品数据为歌曲,其他的数据(指用户数据,用户行为数据等)可以都相同。在这种方案下,业务M和N的数据是独立的,即业务M虽然能看到用户对于歌曲的行为,但是业务M中不包含歌曲的物品数据,所以会丢弃用户对于歌曲的行为;如果业务M中某用户只对歌曲有行为,对视频没有行为,业务M也会丢弃这类用户。反之对业务N亦然。
一个业务最好只推荐一类物品。多类物品的推荐在后续的行业模板会有支持,需要引入板块(plate)的概念,一份业务数据可以生成多个板块的数据集,场景绑定某个板块进行推荐算法计算。
场景(scn)
场景指的是推荐的上下文,每个场景都会输出一个API,场景由推荐时可用的参数决定。有两种场景最为常见,分别是首页推荐场景和详情页推荐场景。顾名思义,在执行首页推荐时,可用的参数只有用户信息;而在执行详情页推荐时,可用的参数除了用户信息,还包括当前详情页上所展示的物品信息。系统中常用scn表示场景。
一个业务可以包含多个场景,即对于某个业务A,它包含多个首页场景也是完全可以的。
事实上,回到场景的原始定义,场景只是由推荐的上下文决定,客户完全可以根据自己的需求建立全新的场景,比如针对搜索关键词的推荐场景,这时可用的参数除了用户信息,还有用户所输入的关键词。
流程(flow)
算法流程指数据端到端的处理流程,一部分流程属于业务范畴,如数据导入流程,效果计算流程,数据质量分计算流程;一部分属于场景,比如场景算法流程。从数据源类型和产出来划分,又分为离线流程,近线流程,在线流程 离线流程
一般情况下,离线流程的输入和输出都是MaxCompute(原ODPS)表,所以离线数据规范其实上是一组MaxCompute表的格式规范,包括接入数据、中间数据和输出数据三类数据的格式规范。接入数据指客户离线提供的用户、物品、日志等数据,中间数据是在离线算法流程中产生的各种中间性质的结果数据表,输出数据是指推荐结果数据表,该结果最终将会被导入到在线存储中,供在线计算模块使用。 近线流程
推荐引擎的的近线流程主要处理用户行为发生变化、推荐物品发生更新时,对离线推荐结果进行更新。不像离线算法,天然以MaxCompute(原ODPS)表作为输入和输出,近线程序的输入数据可以来自多个数据源,如在线的表格存储(原OTS),以及用户的API请求,又或者是程序中的变量;输出可以是程序变量,或者写回在线存储,或者返回给用户。出于安全性考虑,推荐引擎提供了一组SDK供客户自定义在线代码读写在线存储(Table Store),不允许直接访问,所以需要定义每类在线存储的别名和格式。对于需要频繁使用的在线数据,无论其来自在线存储还是用户的API请求,RecEng会预先读好,保存在在线程序的变量中,客户自定义代码可以直接读写这些变量中的数据。 在线流程
推荐引擎的的在线流程负责的任务是推荐API接收到API请求时,实时对离线和近线修正产生的推荐结果进行过滤、排重、补足等处理;后者主要处理用户行为发生变化、推荐物品发生更新时,对离线推荐结果进行更新
一个场景只包含一个离线流程和一个近线流程,可以包含多个在线流程,用于支持A/BTest。
算法策略(Algorithm Strategy)
算法策略定义了一套离线/近线流程。并且透出相关的算法参数,帮助客户构建自己的算法流程。一个场景可以配置多个算法策略,最终会合并执行,产出一系列推荐候选集和过滤集,在线流程通过引用这些候选集来完成个性化推荐。
作业/任务(task)
作业指运行中的离线流程实例,作业和离线流程的关系完全等同于进程和程序的关系。每个作业都是不可重入的,即对每个离线流程,同一时间只允许运行一份实例。作业直接存在上下游关系,如果上游作业失败,下游任务也会被取消。
更多精品课程:
阿里云大学官网( 阿里云大学 - 官方网站,云生态下的创新人才工场 )
大数据
2018-09-07 13:16:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
打开zeppelin interpreter界面
新建hive interpreter
配置hive相关属性
default.driver 和 default.url(hiveserver)是必须配置的,当然我们也可以在url中指定队列 jdbc:hive2://11.84.15.36:8088?mapreduce.job.queuename=队列名
配置jar包依赖
hive-jdbc-1.2.1-standalone.jar 和 hadoop-common-2.6.4.jar
连网环境下可以指定依赖,离线环境下,我们需要将这些jar 放入到zeppelin安装目录的interperter里
这样我们就可以开始使用hive了
大数据
2018-09-07 11:49:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
docker 安装好后默认image存储目录在 /var/lib/docker 目录下,但是通常这个目录挂载的空间很小,所以我们在安装好 docker 后要注意修改 image 存储目录
一、查看当前目录 docker info Containers: 6 Images: 27 Storage Driver: devicemapper Pool Name: docker-8:17-34377337422-pool Pool Blocksize: 65.54 kB Backing Filesystem: xfs Data file: /dev/loop0 Metadata file: /dev/loop1 Data Space Used: 15.5 GB Data Space Total: 107.4 GB Data Space Available: 91.87 GB Metadata Space Used: 12.78 MB Metadata Space Total: 2.147 GB Metadata Space Available: 2.135 GB Udev Sync Supported: true Deferred Removal Enabled: false Data loop file: /home/docker/devicemapper/devicemapper/data Metadata loop file: /home/docker/devicemapper/devicemapper/metadata Library Version: 1.02.107-RHEL7 (2015-10-14) Execution Driver: native-0.2 Logging Driver: json-file Kernel Version: 3.10.0-514.16.1.el7.x86_64 Operating System: CentOS Linux 7 (Core) CPUs: 48 Total Memory: 251.7 GiB Name: map-trafficft-train03.nmg01 ID: EZPC:SQMC:3OIZ:QNRV:IWXW:TMEI:TLYD:2TVK:OKBK:TVCI:KPRQ:AZPE
当然我这个是修改后的,修改后为、home/docker目录下
二、备份当前已有镜像 docker save -o 文件名.tar 镜像名
三、修改目录
查找配置文件
不同安装方式,每个版本配置文件好像不一样,有的是docker.service,有的是docker。我们可以先搜索一下docker.service,如果能搜索到则按照docker.service修改方法修改。我是没有搜索到docker.service,所以我搜索的是docker: find . -name docker ./run/docker ./run/lock/subsys/docker ./home/docker ./home/docker/devicemapper/mnt/85e3a46c4424073ff5eac7c0f59916480b7857996ea247f895f133fbc61ba63c/rootfs/opt/ansible/ansible/lib/ansible/modules/cloud/docker ./home/docker/devicemapper/mnt/85e3a46c4424073ff5eac7c0f59916480b7857996ea247f895f133fbc61ba63c/rootfs/opt/ansible/ansible/test/units/modules/cloud/docker ./home/docker/devicemapper/mnt/85e3a46c4424073ff5eac7c0f59916480b7857996ea247f895f133fbc61ba63c/rootfs/opt/ansible/ansible/test/utils/docker ./home/docker/devicemapper/mnt/85e3a46c4424073ff5eac7c0f59916480b7857996ea247f895f133fbc61ba63c/rootfs/opt/ansible/ansible/test/integration/targets/docker ./home/docker/devicemapper/mnt/d86b865a73dd83d9972608816fbfeb7b7c9ac701764e2939f9d4056821584319/rootfs/usr/lib/python2.7/site-packages/docker ./home/docker/devicemapper/mnt/d86b865a73dd83d9972608816fbfeb7b7c9ac701764e2939f9d4056821584319/rootfs/opt/ansible/ansible/lib/ansible/modules/cloud/docker ./home/docker/devicemapper/mnt/d86b865a73dd83d9972608816fbfeb7b7c9ac701764e2939f9d4056821584319/rootfs/opt/ansible/ansible/test/units/modules/cloud/docker ./home/docker/devicemapper/mnt/d86b865a73dd83d9972608816fbfeb7b7c9ac701764e2939f9d4056821584319/rootfs/opt/ansible/ansible/test/utils/docker ./home/docker/devicemapper/mnt/d86b865a73dd83d9972608816fbfeb7b7c9ac701764e2939f9d4056821584319/rootfs/opt/ansible/ansible/test/integration/targets/docker ./home/docker/devicemapper/mnt/5689c9844ad255f7ed1a65d4e32897cc760d8095e1f28f07256c05a25d990f8c/rootfs/usr/lib/python2.7/site-packages/docker ./home/docker/devicemapper/mnt/5689c9844ad255f7ed1a65d4e32897cc760d8095e1f28f07256c05a25d990f8c/rootfs/opt/ansible/ansible/lib/ansible/modules/cloud/docker ./home/docker/devicemapper/mnt/5689c9844ad255f7ed1a65d4e32897cc760d8095e1f28f07256c05a25d990f8c/rootfs/opt/ansible/ansible/test/units/modules/cloud/docker ./home/docker/devicemapper/mnt/5689c9844ad255f7ed1a65d4e32897cc760d8095e1f28f07256c05a25d990f8c/rootfs/opt/ansible/ansible/test/utils/docker ./home/docker/devicemapper/mnt/5689c9844ad255f7ed1a65d4e32897cc760d8095e1f28f07256c05a25d990f8c/rootfs/opt/ansible/ansible/test/integration/targets/docker ./home/docker/devicemapper/mnt/c499fe8367d97af8a9ba8efbdc8626ef047053004eee3ca8f6450ca9507327f7/rootfs/usr/lib/python2.7/site-packages/docker ./home/docker/devicemapper/mnt/c499fe8367d97af8a9ba8efbdc8626ef047053004eee3ca8f6450ca9507327f7/rootfs/opt/ansible/ansible/lib/ansible/modules/cloud/docker ./home/docker/devicemapper/mnt/c499fe8367d97af8a9ba8efbdc8626ef047053004eee3ca8f6450ca9507327f7/rootfs/opt/ansible/ansible/test/units/modules/cloud/docker ./home/docker/devicemapper/mnt/c499fe8367d97af8a9ba8efbdc8626ef047053004eee3ca8f6450ca9507327f7/rootfs/opt/ansible/ansible/test/utils/docker ./home/docker/devicemapper/mnt/c499fe8367d97af8a9ba8efbdc8626ef047053004eee3ca8f6450ca9507327f7/rootfs/opt/ansible/ansible/test/integration/targets/docker ./etc/docker ./etc/sysconfig/docker ./etc/rc.d/init.d/docker ./var/log/docker ./var/lib/docker ./usr/share/bash-completion/completions/docker ./usr/bin/docker ./usr/libexec/docker
配置文件就是/etc/sysconfig/docker
修改配置文件
在配置文件中将other_args赋值为你想存储的目录,注意一定要带引号 # /etc/sysconfig/docker # # Other arguments to pass to the docker daemon process # These will be parsed by the sysv initscript and appended # to the arguments list passed to docker -d other_args="--graph=/home/docker" DOCKER_CERT_PATH=/etc/docker # Resolves: rhbz#1176302 (docker issue #407) DOCKER_NOWARN_KERNEL_VERSION=1 # Location used for temporary files, such as those created by # # docker load and build operations. Default is /var/lib/docker/tmp # # Can be overriden by setting the following environment variable. # # DOCKER_TMPDIR=/var/tmp
四、重启docker service docker restart
五、查看docker image 存储目录 docker info
六、将image load进来 docker load -i [docker备份文件.tar]
七、删除之前docker imaga目录
大数据
2018-09-07 11:07:00
「深度学习福利」大神带你进阶工程师,立即查看>>> 通常当需要处理的数据量超过了单机尺度(比如我们的计算机有4GB的内存,而我们需要处理100GB以上的数据)这时我们可以选择spark集群进行计算,有时我们可能需要处理的数据量并不大,但是计算很复杂,需要大量的时间,这时我们也可以选择利用spark集群强大的计算资源,并行化地计算,其架构示意图如下: Spark Core:包含Spark的基本功能;尤其是定义RDD的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和Spark Core之上的 Spark SQL:提供通过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每个数据库表被当做一个RDD,Spark SQL查询被转换为Spark操作。 Spark Streaming:对实时数据流进行处理和控制。Spark Streaming允许程序能够像普通RDD一样处理实时数据 MLlib:一个常用机器学习算法库,算法被实现为对RDD的Spark操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。 GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX扩展了RDD API,包含控制图、创建子图、访问路径上所有顶点的操作 Spark架构的组成图如下:
大数据
2018-09-06 15:55:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
BeautifulSoup
1.什么是BeautifulSoup
灵活又方便的网页解析库,处理高效,支持多种解析器。利用它不用编写正则表达式即可方便地实现网页信息的提取
2.安装BeautifulSoup pip3 install lxml pip3 install BeautifulSoup4
3.解析库
解析器 使用方法 优势 劣势
Python标准库 BeautifulSoup(markup, "html.parser") Python的内置标准库、执行速度适中 、文档容错能力强 Python 2.7.3 or 3.2.2)前的版本中文容错能力差
lxml HTML 解析器
lxml XML 解析器 html5lib
BeautifulSoup(markup, "lxml")
BeautifulSoup(markup, "xml") BeautifulSoup(markup, "html5lib")
速度快、文档容错能力强
速度快、唯一支持XML的解析器 最好的容错性、以浏览器的方式解析文档、生成HTML5格式的文档
需要安装C语言库
需要安装C语言库 速度慢、不依赖外部扩展
基本使用 html = """ The Dormouse's story

The Dormouse's story

Once upon a time there were three little sisters; and their names were , Lacie and Tillie; and they lived at the bottom of a well.

...

""" from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') print(soup.prettify()) print(soup.title.string)
标签选择器
1.选择元素 html = """ The Dormouse's story

The Dormouse's story

Once upon a time there were three little sisters; and their names were , Lacie and Tillie; and they lived at the bottom of a well.

...

""" from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') print(soup.title) print(type(soup.title)) print(soup.head) print(soup.p)
2.获取名称 html = """ The Dormouse's story

The Dormouse's story

Once upon a time there were three little sisters; and their names were , Lacie and Tillie; and they lived at the bottom of a well.

...

""" from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') print(soup.title.name)
3.获取属性 html = """ The Dormouse's story

The Dormouse's story

Once upon a time there were three little sisters; and their names were , Lacie and Tillie; and they lived at the bottom of a well.

...

""" from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') print(soup.p.attrs['name']) print(soup.p['name'])
4.获取内容 html = """ The Dormouse's story

The Dormouse's story

Once upon a time there were three little sisters; and their names were , Lacie and Tillie; and they lived at the bottom of a well.

...

""" from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') print(soup.p.string)
5.嵌套选择 html = """ The Dormouse's story

The Dormouse's story

Once upon a time there were three little sisters; and their names were , Lacie and Tillie; and they lived at the bottom of a well.

...

""" from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') print(soup.head.title.string)
6.子节点和子孙节点 html = """ The Dormouse's story

Once upon a time there were three little sisters; and their names were Elsie Lacie and Tillie and they lived at the bottom of a well.

...

""" from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') print(soup.p.contents) html = """ The Dormouse's story

Once upon a time there were three little sisters; and their names were Elsie Lacie and Tillie and they lived at the bottom of a well.

...

""" from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') print(soup.p.children) for i, child in enumerate(soup.p.children): print(i, child) html = """ The Dormouse's story

Once upon a time there were three little sisters; and their names were Elsie Lacie and Tillie and they lived at the bottom of a well.

...

""" from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') print(soup.p.descendants) for i, child in enumerate(soup.p.descendants): print(i, child)
7.父节点和祖先节点 html = """ The Dormouse's story

Once upon a time there were three little sisters; and their names were Elsie Lacie and Tillie and they lived at the bottom of a well.

...

""" from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') print(soup.a.parent) html = """ The Dormouse's story

Once upon a time there were three little sisters; and their names were Elsie Lacie and Tillie and they lived at the bottom of a well.

...

""" from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') print(list(enumerate(soup.a.parents)))
8.兄弟节点 html = """ The Dormouse's story

Once upon a time there were three little sisters; and their names were Elsie Lacie and Tillie and they lived at the bottom of a well.

...

""" from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') print(list(enumerate(soup.a.next_siblings))) print(list(enumerate(soup.a.previous_siblings)))
标准选择器
1.find返回单个元素,find_all返回所有元素 find( name , attrs , recursive , text , **kwargs ) html='''

Hello

  • Foo
  • Bar
  • Jay
  • Foo
  • Bar
''' from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') print(soup.find('ul')) print(type(soup.find('ul'))) print(soup.find('page')) find_all( name , attrs , recursive , text , **kwargs )
2.可根据标签名、属性、内容查找文档
name html='''

Hello

  • Foo
  • Bar
  • Jay
  • Foo
  • Bar
''' from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') print(soup.find_all('ul')) print(type(soup.find_all('ul')[0])) html='''

Hello

  • Foo
  • Bar
  • Jay
  • Foo
  • Bar
''' from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') for ul in soup.find_all('ul'): print(ul.find_all('li'))
attrs html='''

Hello

  • Foo
  • Bar
  • Jay
  • Foo
  • Bar
''' from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') print(soup.find_all(attrs={'id': 'list-1'})) print(soup.find_all(attrs={'name': 'elements'})) html='''

Hello

  • Foo
  • Bar
  • Jay
  • Foo
  • Bar
''' from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') print(soup.find_all(id='list-1')) print(soup.find_all(class_='element'))
text html='''

Hello

  • Foo
  • Bar
  • Jay
  • Foo
  • Bar
''' from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') print(soup.find_all(text='Foo'))
3.find_parents() find_parent()
find_parents()返回所有祖先节点,find_parent()返回父节点。
4.find_next_siblings() find_next_sibling()
find_next_siblings()返回后面所有兄弟节点,find_next_sibling()返回后面第一个兄弟节点。
5.find_previous_siblings() find_previous_sibling()
find_previous_siblings()返回前面所有兄弟节点,find_previous_sibling()返回前面第一个兄弟节点。
6.find_all_next() find_next()
find_all_next()返回节点后所有符合条件的节点, find_next()返回节点后第一个符合条件的节点
7.find_all_previous() 和 find_previous()
find_all_previous()返回节点前所有符合条件的节点, find_previous()返回节点前第一个符合条件的节点
CSS选择器
1.通过select()直接传入CSS选择器即可完成选择 html='''

Hello

  • Foo
  • Bar
  • Jay
  • Foo
  • Bar
''' from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') print(soup.select('.panel .panel-heading')) print(soup.select('ul li')) print(soup.select('#list-2 .element')) print(type(soup.select('ul')[0])) html='''

Hello

  • Foo
  • Bar
  • Jay
  • Foo
  • Bar
''' from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') for ul in soup.select('ul'): print(ul.select('li'))
2.获取属性 html='''

Hello

  • Foo
  • Bar
  • Jay
  • Foo
  • Bar
''' from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') for ul in soup.select('ul'): print(ul['id']) print(ul.attrs['id'])
3.获取内容 html='''

Hello

  • Foo
  • Bar
  • Jay
  • Foo
  • Bar
''' from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'lxml') for li in soup.select('li'): print(li.get_text())
总结思考
1.推荐使用lxml解析库,必要时使用html.parser
2.标签选择筛选功能弱但是速度快
3.建议使用find()、find_all() 查询匹配单个结果或者多个结果
4.如果对CSS选择器熟悉建议使用select()
5.记住常用的获取属性和文本值的方法
大数据
2018-09-05 15:00:00
「深度学习福利」大神带你进阶工程师,立即查看>>>
Requests库介绍
1.什么是Requests库
Requests使用Python语言编写的,基于urllib,采用Apache2 Licensed开源协议的HTTP库。它比urllib更加方便,可以节约我们大量的工作,完全满足HTTP测试需求。一句话——Python实现的简单易用的HTTP库
2.安装 pip3 install requests
实例引入 import requests response = requests.get('https://www.baidu.com/') print(type(response)) print(response.status_code) print(type(response.text)) print(response.text) print(response.cookies)
各种请求方式 import requests requests.post('http://httpbin.org/post') requests.put('http://httpbin.org/put') requests.delete('http://httpbin.org/delete') requests.head('http://httpbin.org/get') requests.options('http://httpbin.org/get')
请求
1.基本GET请求
①基本写法 import requests response = requests.get('http://httpbin.org/get') print(response.text)
②带参数GET请求 import requests response = requests.get("http://httpbin.org/get?name=germey&age=22") print(response.text) import requests data = { 'name': 'germey', 'age': 22 } response = requests.get("http://httpbin.org/get", params=data) print(response.text)
③解析json import requests import json response = requests.get("http://httpbin.org/get") print(type(response.text)) print(response.json()) print(json.loads(response.text)) print(type(response.json()))
④获取二进制数据 import requests response = requests.get("https://github.com/favicon.ico") print(type(response.text), type(response.content)) print(response.text) print(response.content) import requests response = requests.get("https://github.com/favicon.ico") with open('favicon.ico', 'wb') as f: f.write(response.content) f.close()
⑤添加headers import requests response = requests.get("https://www.zhihu.com/explore") print(response.text) import requests headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36' } response = requests.get("https://www.zhihu.com/explore", headers=headers) print(response.text)
2.基本POST请求 import requests data = {'name': 'germey', 'age': '22'} response = requests.post("http://httpbin.org/post", data=data) print(response.text) import requests data = {'name': 'germey', 'age': '22'} headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36' } response = requests.post("http://httpbin.org/post", data=data, headers=headers) print(response.json())
响应
1.reponse属性 import requests response = requests.get('http://www.jianshu.com') print(type(response.status_code), response.status_code) print(type(response.headers), response.headers) print(type(response.cookies), response.cookies) print(type(response.url), response.url) print(type(response.history), response.history)
2.状态码判断 import requests response = requests.get('http://www.jianshu.com/hello.html') exit() if not response.status_code == requests.codes.not_found else print('404 Not Found') import requests response = requests.get('http://www.jianshu.com') exit() if not response.status_code == 200 else print('Request Successfully') 100: ('continue',), 101: ('switching_protocols',), 102: ('processing',), 103: ('checkpoint',), 122: ('uri_too_long', 'request_uri_too_long'), 200: ('ok', 'okay', 'all_ok', 'all_okay', 'all_good', '\\o/', '✓'), 201: ('created',), 202: ('accepted',), 203: ('non_authoritative_info', 'non_authoritative_information'), 204: ('no_content',), 205: ('reset_content', 'reset'), 206: ('partial_content', 'partial'), 207: ('multi_status', 'multiple_status', 'multi_stati', 'multiple_stati'), 208: ('already_reported',), 226: ('im_used',), # Redirection. 300: ('multiple_choices',), 301: ('moved_permanently', 'moved', '\\o-'), 302: ('found',), 303: ('see_other', 'other'), 304: ('not_modified',), 305: ('use_proxy',), 306: ('switch_proxy',), 307: ('temporary_redirect', 'temporary_moved', 'temporary'), 308: ('permanent_redirect', 'resume_incomplete', 'resume',), # These 2 to be removed in 3.0 # Client Error. 400: ('bad_request', 'bad'), 401: ('unauthorized',), 402: ('payment_required', 'payment'), 403: ('forbidden',), 404: ('not_found', '-o-'), 405: ('method_not_allowed', 'not_allowed'), 406: ('not_acceptable',), 407: ('proxy_authentication_required', 'proxy_auth', 'proxy_authentication'), 408: ('request_timeout', 'timeout'), 409: ('conflict',), 410: ('gone',), 411: ('length_required',), 412: ('precondition_failed', 'precondition'), 413: ('request_entity_too_large',), 414: ('request_uri_too_large',), 415: ('unsupported_media_type', 'unsupported_media', 'media_type'), 416: ('requested_range_not_satisfiable', 'requested_range', 'range_not_satisfiable'), 417: ('expectation_failed',), 418: ('im_a_teapot', 'teapot', 'i_am_a_teapot'), 421: ('misdirected_request',), 422: ('unprocessable_entity', 'unprocessable'), 423: ('locked',), 424: ('failed_dependency', 'dependency'), 425: ('unordered_collection', 'unordered'), 426: ('upgrade_required', 'upgrade'), 428: ('precondition_required', 'precondition'), 429: ('too_many_requests', 'too_many'), 431: ('header_fields_too_large', 'fields_too_large'), 444: ('no_response', 'none'), 449: ('retry_with', 'retry'), 450: ('blocked_by_windows_parental_controls', 'parental_controls'), 451: ('unavailable_for_legal_reasons', 'legal_reasons'), 499: ('client_closed_request',), # Server Error. 500: ('internal_server_error', 'server_error', '/o\\', '✗'), 501: ('not_implemented',), 502: ('bad_gateway',), 503: ('service_unavailable', 'unavailable'), 504: ('gateway_timeout',), 505: ('http_version_not_supported', 'http_version'), 506: ('variant_also_negotiates',), 507: ('insufficient_storage',), 509: ('bandwidth_limit_exceeded', 'bandwidth'), 510: ('not_extended',), 511: ('network_authentication_required', 'network_auth', 'network_authentication'),
高级操作
1.文件上传 import requests files = {'file': open('favicon.ico', 'rb')} response = requests.post("http://httpbin.org/post", files=files) print(response.text)
2.获取cookie import requests response = requests.get("https://www.baidu.com") print(response.cookies) for key, value in response.cookies.items(): print(key + '=' + value)
3.会话维持 import requests requests.get('http://httpbin.org/cookies/set/number/123456789') response = requests.get('http://httpbin.org/cookies') print(response.text) import requests s = requests.Session() s.get('http://httpbin.org/cookies/set/number/123456789') response = s.get('http://httpbin.org/cookies') print(response.text)
4.证书验证 import requests response = requests.get('https://www.12306.cn') print(response.status_code) import requests from requests.packages import urllib3 urllib3.disable_warnings() response = requests.get('https://www.12306.cn', verify=False) print(response.status_code) import requests response = requests.get('https://www.12306.cn', cert=('/path/server.crt', '/path/key')) print(response.status_code)
5.代理设置 import requests proxies = { "http": "http://127.0.0.1:9743", "https": "https://127.0.0.1:9743", } response = requests.get("https://www.taobao.com", proxies=proxies) print(response.status_code) import requests proxies = { "http": "http://user:password@127.0.0.1:9743/", } response = requests.get("https://www.taobao.com", proxies=proxies) print(response.status_code) pip3 install 'requests[socks]' import requests proxies = { 'http': 'socks5://127.0.0.1:9742', 'https': 'socks5://127.0.0.1:9742' } response = requests.get("https://www.taobao.com", proxies=proxies) print(response.status_code)
6.超时设置 import requests from requests.exceptions import ReadTimeout try: response = requests.get("http://httpbin.org/get", timeout = 0.5) print(response.status_code) except ReadTimeout: print('Timeout')
7.认证设置 import requests from requests.auth import HTTPBasicAuth r = requests.get('http://120.27.34.24:9001', auth=HTTPBasicAuth('user', '123')) print(r.status_code) import requests r = requests.get('http://120.27.34.24:9001', auth=('user', '123')) print(r.status_code)
8.异常处理 import requests from requests.exceptions import ReadTimeout, ConnectionError, RequestException try: response = requests.get("http://httpbin.org/get", timeout = 0.5) print(response.status_code) except ReadTimeout: print('Timeout') except ConnectionError: print('Connection error') except RequestException: print('Error')
大数据
2018-09-05 14:49:00