数据专栏

智能大数据搬运工,你想要的我们都有

科技资讯:

科技学院:

科技百科:

科技书籍:

网站大全:

软件大全:

1 config配置中心
1-1 pom依赖
这里引入 spring-cloud-config-server 即可,spring-boot-starter-security 只是为了给配置中心加一个访问验证,可以忽略该引用: org.springframework.cloud spring-cloud-config-server org.springframework.boot spring-boot-starter-security org.springframework.cloud spring-cloud-dependencies Camden.SR3 pom import
1-2 配置文件 application.yml
研发项目时,公司肯定会有几套环境,生产、测试、开发、本地等,所以我们的配置文件也应该是分别对应的,下图中是一个示例:config-file 是远程的git仓库,test 是项目下的一级文件夹,test下有两个文件夹分别存放 prd 和 dev 环境的配置文件,application.properties 是公用配置文件,config-test.properties中的 config-test 是其中一个微服务的名称,这样微服务公共的配置放到 application.properties,微服务个性化的配置在各自的配置文件中。注意:微服务的个性化配置会覆盖 application.properties 中的相同配置。
下面的配置默认加载远程仓库 test/dev 下的配置,如果密码中含有特殊字符,可以加转义符“\”,或者直接用单引号 “ ‘ ” 将密码引起来,这里用户名默认为user,密码:12345 spring: cloud: config: server: git: uri: https://code.aliyun.com/995586041/config-file.git searchPaths: test/dev username: 995586041@qq.com password: ',******' repos: prd: pattern: "*/prd*" uri: https://code.aliyun.com/995586041/config-file.git searchPaths: test/prd username: 995586041@qq.com password: \,****** dev: pattern: "*/dev*" uri: https://code.aliyun.com/995586041/config-file.git searchPaths: test/dev username: 995586041@qq.com password: ',******' server: port: 8888 security: user: password: ${CONFIG_SERVICE_PASSWORD:12345}
1-3 启动类
加上 @EnableConfigServer 注解即可 @SpringBootApplication @EnableConfigServer public class ConfigApplication { public static void main(String[] args) { SpringApplication.run(ConfigApplication.class, args); } }
1-4 访问
http://localhost:8888/test/application,如果上面设置了访问密码,会提示输入用户名和密码,填入相关信息进行验证,然后可以看到相关配置信息:
2 测试配置中心
1-1 pom文件 org.springframework.cloud spring-cloud-starter-eureka org.springframework.cloud spring-cloud-starter-config
1-2 配置文件 bootstrap.properties
这里配置文件使用 bootstrap.properties,项目启动时会先加载 bootstrap.properties 然后再加载 application.properties,这里我们指定 prd ,这样我们就可以指定了配置文件,注意这里的微服务名称为:config-test ,下边会用到 server.port = 8082 spring.profiles.active = prd spring.application.name = config-test spring.cloud.config.uri = http://127.0.0.1:8888 spring.cloud.config.fail-fast = true spring.cloud.config.username = user spring.cloud.config.password = 12345 spring.cloud.config.profile=prd goldleaf.test01 = test01 ${spring.profiles.active} bootstrap.properties goldleaf.test05 = test05 ${spring.profiles.active} bootstrap.properties goldleaf.test06 = test06 ${spring.profiles.active} bootstrap.properties
1-3 启动类及测试接口
可以看到,我们这里并没有加有关配置中心的特殊注解,我只是写了一个测试接口 @SpringBootApplication @RestController public class UserApplication { @Value("${goldleaf.test01}") private String test01; @Value("${goldleaf.test02}") private String test02; @Value("${goldleaf.test03}") private String test03; @Value("${goldleaf.test05}") private String test05; @Value("${goldleaf.test06}") private String test06; public static void main(String[] args) { SpringApplication.run(UserApplication.class, args); } @GetMapping("/test") public String getConfig() { StringBuilder builder = new StringBuilder(); builder.append("test01:" + test01 + "\r\n"); builder.append("test02:" + test02 + "\r\n"); builder.append("test03:" + test03 + "\r\n"); builder.append("test05:" + test05 + "\r\n"); builder.append("test06:" + test06 + "\r\n"); return builder.toString(); } }
1-4 测试
现在我们有了三个配置文件:项目里的 bootstrap.properties、远程的 application.properties 和远程的 config-test.properties。配置说明:1). 在上面三个文件中分别定义了 test01、test02、test03,用来说明三个配置文件中的配置都是起作用的;2). 在上面三个文件中同时定义了test05,用来说明三个文件的优先级;3). 在 bootstrap.properties 和 application.properties 中同时定义了 test06 ,用来确定二者的优先级。启动项目之后,现在我们通过访问 config-test 的test接口查看一下结果:

test01-03,说明三个文件中的配置都有加载;test05 说明微服务的个性化配置会覆盖前面的配置,优先级最高;test06 说明默认情况下远程仓库的 application.properties 优先级高于 bootstrap.properties
优先级顺序:远程config-test > 远程application.properties > 本地bootstrap.properties
3 本地覆盖远程配置
在远程仓库的git仓库中添加下列配置: # 允许本地配置覆盖远程配置 spring.cloud.config.allowOverride=true spring.cloud.config.overrideNone=true spring.cloud.config.overrideSystemProperties=false
然后访问测试接口:http://127.0.0.1:8082/test,test05、test06加载的是本地bootstrap.properties中的配置信息:
4 手动刷新配置
开发的时候,难免会更改某些配置,如果每次更改配置都进行服务的重新发布,有点让人头大,所以我们在更改配置文件之后,手动刷新一下配置。
在非config端的pom文件中增加 spring-boot-starter-actuator 依赖: org.springframework.cloud spring-cloud-starter-eureka org.springframework.cloud spring-cloud-starter-config org.springframework.boot spring-boot-starter-actuator
修改git仓库的配置文件之后,POST方式调用服务的 /refresh 端点,如:http://127.0.0.1:8082/refresh 这里会返回有哪些配置点被修改:
查看结果:
5 项目代码
core-simple 项目: https://code.aliyun.com/995586041/core-simple.git
config-server: https://code.aliyun.com/995586041/config.git
config-client: https://code.aliyun.com/995586041/config-client.git
config-file: https://code.aliyun.com/995586041/config-file.git
软件开发
2018-05-07 21:52:00
上一个项目的开发中需要实现从word中读取表格数据的功能,在JAVA社区搜索了很多资料,终于找到了两个相对最佳的方案,因为也得到了不少网友们的帮助,所以不敢独自享用,在此做一个分享。
两个方案分别是:一,用POI的TableIterator获取表格中的数据;二,用PageOffice来获取。
  为什么说是两个相对最佳的方案呢?因为两个方案都各有优缺点,POI的优点很明显,就是免费,这正是PageOffice的缺点,PageOffice是一个国产的商业Office组件;POI的缺点有点多,接口复杂调用起来比较麻烦,尤其是不好读取word指定位置处的内容。由于获取表格数据的代码是在服务器端执行的,所以要求很高的代码质量,要考虑到代码执行效率问题、用户请求并发问题、大文档执行慢阻塞页面的问题等等,POI的架构属于仿VBA接口的模型,比VBA代码还要复杂,在调用方便上未做任何优化,光看代码都觉得头疼。所以在实际使用的过程中会遇到这些问题需要自己解决,相对来说这正是PageOffice的优点,使用PageOffice的话,就不会遇到这些问题,因为PageOffice的获取word中表格数据的工作是在客户端执行的,确实也符合了分布式计算思想,减轻服务器端压力,还有个强悍的功能,PageOffice可以从word表格中用很简单一句代码把图片提取出来!!!
  PageOffice虽是收费的,但是事半功倍,而且还能实现许多POI无法实现的功能。如果确实预算紧张,还是需要用POI,再难用也要捏着鼻子用了……,闲话少撤,看代码实现。
  PageOffice获取word表格中数据的核心代码: WordDocument doc = new WordDocument(request,response); DataRegion dataReg = doc.openDataRegion("PO_table"); Table table = dataReg.openTable(1); String cellValue = table.openCellRC(1,2).getValue(); //获取书签“PO_table”中表格里第1行第2列单元格的值 doc.close();
  以上代码是从例子代码里拷贝出来的,可以从PageOffice的官网下载中心下载“PageOffice for JAVA ”,把PageOffice开发包里的Samples4运行起来,看示例(二、16、获取Word文件中表格的数据)里面的具体代码和实现效果。
  需要说明一点,PageOffice中提到了一个数据区域(DataRegion)的概念,其实所谓的数据区域本质上就是书签,但是这个书签必须以“PO_” 开头。把表格放到数据区域中貌似不方便,但是好处很大,如果word文件中有多个表格的话,可以用数据区域去指定PageOffice获取word中哪个表格的数据,定位非常方便,比方说PO_Table的书签里有一个表格,那么不管这个表格在整个word文件中是第几个表(word中的表格没有名称只有Index,从文件头到末尾依次编号的)用doc.openDataRegion("PO_table").openTable(1);总是可以获取到这个表格的数据,非常方便,用POI就不行了,表格、图片位置移动,代码必须重写。
  就写这么多吧,做个共享,希望对大家都有帮助。
软件开发
2018-05-07 15:35:00
一、需求背景
  在项目开发中,经常会遇到导出Excel报表文件的情况,因为很多情况下,我们需要打印Excel报表,虽然在网页上也可以生成报表,但是打印网上里的报表是无法处理排版问题的,所以最好的方式,还是生成Excel文件。
PageOffice封装了一组用于动态输出数据到Excel文档的相关类,全部包含在com.zhuozhengsoft.pageoffice.excelwriter 命名空间之中。PageOffice对Excel的赋值操作分两种方式:1. 单元格赋值,这个很好理解,sheet.openCell("D5"),返回值就是一个Cell对象;2. 针对一个区域赋值。这个区域在PageOffice的概念里就是Table对象,比如:sheet.openTable("C9:H15")的返回值就是就是Table对象,这个Table就是”C9:H15”这个区域。下面就针对这两种操作方式来分别介绍。
二、 给Excel单元格赋值
  创建Workbook对象,操作指定sheet中的指定单元格,在打开Excel文件后通过PageOfficeCtrl对象的setWriter方法把数据写入到Excel文件中:
Workbook wb = new Workbook(); Sheet sheet = wb.openSheet("销售订单"); sheet.openCell("D5").setValue(“北京某某公司”); PageOfficeCtrl poCtrl1 = new PageOfficeCtrl(request); poCtrl1.setServerPage("poserver.do"); poCtrl1.setWriter(wb); poCtrl1.webOpen("{模板文件路径}", OpenModeType.xlsSubmitForm, "");
  通过上面的代码可以看出,给Excel单元格赋值,首先需要创建Workbook对象,然后通过此对象的OpenSheet方法,获取到Sheet对象,再通过Sheet对象的OpenCell方法就可以获取的Cell对象,进行赋值或其他操作。
  Sheet对象有两个方法可以获取到Cell对象:1. openCell(String CellAddress),参数为单元格引用字符串。例如:"A1";2. openCellRC(int Row, int Col),参数为excel单元格的行数和列数。所以上面给Excel单元格赋值的代码改成下面的代码也是可以的。 sheet.openCellRC(5,4).setValue(“北京某某公司”);
三、设置Cell的样式
  
  这些属性不但可以用来设置单元格的前景色、背景色、边框、字体和对齐方式,甚至可以设置公式,基本上所有的单元格设置需求都可以实现。比如:设置一个单元格的背景色为为绿色。
   Workbook wb = new Workbook(); wb.openSheet("Sheet1").openCell("E16").setBackColor(new Color(0, 128, 128));
  果要设置单元格的字体,就需要操作Font对象进行设置;如果要设置单元格的边框样式,就需要操作Border对象进行设置。使用Border对象设置Excel的单元格样式,是可以分别对单元格的上下左右边框单独设置样式的,所以再复杂的表格样式用PageOffice也可以“绘制”出来。PageOffice中的Table对象可以设置Table的Border样式,所以在此不作详细的叙述,下面单独用一个章节来叙述Border的设置。
四、操作Excel中的区域(Table) 
  PageOffice开发平台中,针对Excel文件的处理增加了一个“Table”的概念,一个Table指的就是一个区域,例如:sheet.OpenTable("C9:H15")的返回值就是就是Table对象,这个Table所操作的区域就是”C9:H15”。 为何需要这个Table的概念呢?下面就说明一下使用Table对象的优点。
在实际的项目需求中,常常会需要在Excel 中循环的插入多条数的数据,比如:需要在excel中以B11单元格为起始位置,插入10条包含6个字段的数据,如果使用Cell对象写一个循环程序给单元格赋值会是这样的:
DataTable dt = new DataTable(); for (int i = 0; i < 10; i++) // 10条数据 { sheet.OpenCellRC(“B”+(11+i).ToString())Value = dt.Rows[i][0].ToString(); sheet.OpenCellRC(“C”+(11+i).ToString())Value = dt.Rows[i][1].ToString(); sheet.OpenCellRC(“D”+(11+i).ToString())Value = dt.Rows[i][2].ToString(); sheet.OpenCellRC(“E”+(11+i).ToString())Value = dt.Rows[i][3].ToString(); sheet.OpenCellRC(“F”+(11+i).ToString())Value = dt.Rows[i][4].ToString(); sheet.OpenCellRC(“G”+(11+i).ToString())Value = dt.Rows[i][5].ToString(); }
  如果使用Table对象编程,就与操作数据集的概念一样,代码也更灵活,代码如下:
DataTable dt = new DataTable(); PageOffice.ExcelWriter.Table table1 = sheet.OpenTable("B11:G20"); for (int i = 0; i < 10; i++) // 10条数据 { for (int j = 0; j <6; j++) // 6个字段 { table1.DataFields[j].Value = dt.Rows[i][j].ToString(); } table1.NextRow(); } table1.Close();
  通过Cell实现这个赋值操作还有一个方法,使用OpenCellRC方法,通过行列号操作会更简单,代码如下:
DataTable dt = new DataTable(); for (int i = 0; i < 10; i++) // 10条数据 { for (int j = 0; j <6; j++) // 6个字段 { sheet.OpenCellRC(11+i,2+j )Value = dt.Rows[i][j].ToString(); } }
  但是这个代码相对于Table对象的操作来说有点晦涩,只是看OpenCellRC中的参数是不容易立刻知道操作的是哪个单元格的,还有一个情况是通过Cell赋值无法做到的,那就是在已有的表格模板中插入不定行数的数据。例如:下图中的模板在“合计”之前只有10行空白行,怎么动态插入50条数据并且数据行的样式也统一呢?这种情况使用Cell是无法解决问题的,但是前面使用Table给Excel赋值的代码就可以解决这个问题。使用Table赋值的特点是:在赋值的过程中,如果Table所包含的区域行数不够,那么Table会自动插入行,并且循环重复使用Table区域中各行的样式,直到所有的数据都填充完毕。
软件开发
2018-05-11 11:33:00

修改页面出现默认值,.默认值为value="${dNotice.noticeName}"

软件开发
2018-05-19 14:17:00
RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。
消息中间件在互联网公司的使用中越来越多,消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的。在分布式的系统中,消息队列也会被用在很多其它的方面,比如:分布式事务的支持,RPC的调用等等。
RabbitMQ具有高可用性、高性能、灵活性等特点。
RabbitMQ介绍
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ主要是为了实现系统之间的双向解耦。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
相关概念
通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。
左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。 中间即是 RabbitMQ, 其中包括了 交换机 和 队列。 右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。
那么, 其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中, 用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。 交换机: Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念: 路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。 绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。
交换机(Exchange)
交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct、topic、Headers、Fanout Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key , 消息的 routing_key 匹配时, 才会被交换器投送到绑定的队列中去. Topic:按规则转发消息(最灵活) Headers:设置header attribute参数类型的交换机 Fanout:转发消息到所有绑定队列
Direct Exchange
Direct Exchange是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。
第一个 X - Q1 就有一个 binding key,名字为 orange; X - Q2 就有 2 个 binding key,名字为 black 和 green。 当消息中的 路由键 和 这个 binding key 对应上的时候,那么就知道了该消息去到哪一个队列中。
Ps:为什么 X 到 Q2 要有 black,green,2个 binding key呢,一个不就行了吗? - 这个主要是因为可能又有 Q3,而Q3只接受 black 的信息,而Q2不仅接受black 的信息,还接受 green 的信息。
Topic Exchange
Topic Exchange 转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。
在这种交换机模式下: 路由键必须是一串字符,用句号( . ) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。 路由模式必须包含一个 星号( * ),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.*,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。 #就表示相当于一个或者多个单词,例如一个匹配模式是agreements.eu.berlin.#,那么,以agreements.eu.berlin开头的路由键都是可以的。
具体代码发送的时候还是一样,第一个参数表示交换机,第二个参数表示routing key,第三个参数即消息。如下: rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is RabbitMQ!");
topic 和 direct 类似, 只是匹配上支持了”模式”, 在”点分”的 routing_key 形式中, 可以使用两个通配符: * 表示一个词. # 表示零个或多个词.
Headers Exchange
headers 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型. 在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.
Fanout Exchange
Fanout Exchange 消息广播的模式,不管路由键或者是路由模式, 会把消息发给绑定给它的全部队列 ,如果配置了routing_key会被忽略。
springboot集成RabbitMQ
springboot集成RabbitMQ非常简单,如果只是简单的使用配置非常少,springboot提供了spring-boot-starter-amqp项目对消息各种支持。
简单使用
1、配置pom包,主要是添加spring-boot-starter-amqp的支持 org.springframework.boot spring-boot-starter-amqp
2、配置文件
配置rabbitmq的安装地址、端口以及账户信息 spring.application.name=spirng-boot-rabbitmq spring.rabbitmq.host=192.168.0.110 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin
3、队列配置 @Configuration public class RabbitConfig { @Bean public Queue Queue() { return new Queue("rabbit"); } }
3、发送者
rabbitTemplate是springboot 提供的默认实现 public class RabbitSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "rabbit" + new Date(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("rabbit", context); } }
4、接收者 @Component @RabbitListener(queues = "rabbit") public class RabbitReceiver { @RabbitHandler public void process(String rabbit) { System.out.println("Receiver : " + rabbit); } }
5、测试 @RunWith(SpringRunner.class) @SpringBootTest public class RabbitMqTest { @Autowired private RabbitSender rabbitSender; @Test public void rabbit() throws Exception { rabbitSender.send(); } } 注意,发送者和接收者的queue name必须一致,不然不能接收
多对多使用
一个发送者,N个接收者或者N个发送者和N个接收者会出现什么情况呢?
一对多发送
对上面的代码进行了小改造,接收端注册了两个Receiver,Receiver1和Receiver2,发送端加入参数计数,接收端打印接收到的参数,下面是测试代码,发送一百条消息,来观察两个接收端的执行效果 @Test public void oneToMany() throws Exception { for (int i=0;i<100;i++){ neoSender.send(i); } }
结果如下: Receiver 1: spirng boot neo queue ****** 11 Receiver 2: spirng boot neo queue ****** 12 Receiver 2: spirng boot neo queue ****** 14 Receiver 1: spirng boot neo queue ****** 13 Receiver 2: spirng boot neo queue ****** 15 Receiver 1: spirng boot neo queue ****** 16 Receiver 1: spirng boot neo queue ****** 18 Receiver 2: spirng boot neo queue ****** 17 Receiver 2: spirng boot neo queue ****** 19 Receiver 1: spirng boot neo queue ****** 20
根据返回结果得到以下结论 一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中
多对多发送
复制了一份发送者,加入标记,在一百个循环中相互交替发送 @Test public void manyToMany() throws Exception { for (int i=0;i<100;i++){ neoSender.send(i); neoSender2.send(i); } }
结果如下: Receiver 1: spirng boot neo queue ****** 20 Receiver 2: spirng boot neo queue ****** 20 Receiver 1: spirng boot neo queue ****** 21 Receiver 2: spirng boot neo queue ****** 21 Receiver 1: spirng boot neo queue ****** 22 Receiver 2: spirng boot neo queue ****** 22 Receiver 1: spirng boot neo queue ****** 23 Receiver 2: spirng boot neo queue ****** 23 Receiver 1: spirng boot neo queue ****** 24 Receiver 2: spirng boot neo queue ****** 24 Receiver 1: spirng boot neo queue ****** 25 Receiver 2: spirng boot neo queue ****** 25 结论:和一对多一样,接收端仍然会均匀接收到消息
高级使用
对象的支持
springboot以及完美的支持对象的发送和接收,不需要格外的配置。 //发送者 public void send(User user) { System.out.println("Sender object: " + user.toString()); this.rabbitTemplate.convertAndSend("object", user); } ... //接收者 @RabbitHandler public void process(User user) { System.out.println("Receiver object : " + user); }
结果如下: Sender object: User{name='neo', pass='123456'} Receiver object : User{name='neo', pass='123456'}
Topic Exchange
topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列
首先对topic规则配置,这里使用两个队列来测试 @Configuration public class TopicRabbitConfig { final static String message = "topic.message"; final static String messages = "topic.messages"; @Bean public Queue queueMessage() { return new Queue(TopicRabbitConfig.message); } @Bean public Queue queueMessages() { return new Queue(TopicRabbitConfig.messages); } @Bean TopicExchange exchange() { return new TopicExchange("exchange"); } @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } }
使用queueMessages同时匹配两个队列,queueMessage只匹配”topic.message”队列 public void send1() { String context = "hi, i am message 1"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("exchange", "topic.message", context); } public void send2() { String context = "hi, i am messages 2"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context); }
发送send1会匹配到topic.#和topic.message 两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息
Fanout Exchange
Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
Fanout 相关配置 @Configuration public class FanoutRabbitConfig { @Bean public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } @Bean public Queue CMessage() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } }
这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略: public void send() { String context = "hi, fanout msg "; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("fanoutExchange","", context); }
结果如下: Sender : hi, fanout msg ... fanout Receiver B: hi, fanout msg fanout Receiver A : hi, fanout msg fanout Receiver C: hi, fanout msg
结果说明,绑定到fanout交换机上面的队列都收到了消息
软件开发
2018-05-30 09:59:00
1. 问题描述
1.1 实体类的命名
1.2 数据库列
1.3 结果
导致 不能讲结果封装到对象上
2. 解决办法 (注意代码添加的位置)
2.1 添加 代码




2.2 测试


软件开发
2018-06-12 11:33:00
#prepareBeanFactory protected void prepareBeanFactory(ConfigurableListableBeanFactory beanFactory) { // 设置 BeanFactory 的类加载器,我们知道 BeanFactory 需要加载类,也就需要类加载器, // 这里设置为当前 ApplicationContext 的类加载器 beanFactory.setBeanClassLoader(this.getClassLoader()); // 设置 BeanExpressionResolver beanFactory.setBeanExpressionResolver(new StandardBeanExpressionResolver(beanFactory.getBeanClassLoader())); beanFactory.addPropertyEditorRegistrar(new ResourceEditorRegistrar(this, this.getEnvironment())); // 添加一个 BeanPostProcessor,这个 processor 比较简单, // 实现了 Aware 接口的几个特殊的 beans 在初始化的时候,这个 processor 负责回调 beanFactory.addBeanPostProcessor(new ApplicationContextAwareProcessor(this)); // 下面几行的意思就是,如果某个 bean 依赖于以下几个接口的实现类,在自动装配的时候忽略它们, // Spring 会通过其他方式来处理这些依赖 beanFactory.ignoreDependencyInterface(EnvironmentAware.class); beanFactory.ignoreDependencyInterface(EmbeddedValueResolverAware.class); beanFactory.ignoreDependencyInterface(ResourceLoaderAware.class); beanFactory.ignoreDependencyInterface(ApplicationEventPublisherAware.class); beanFactory.ignoreDependencyInterface(MessageSourceAware.class); beanFactory.ignoreDependencyInterface(ApplicationContextAware.class); /** * 下面几行就是为特殊的几个 bean 赋值,如果有 bean 依赖了以下几个,会注入这边相应的值, * 之前我们说过,"当前 ApplicationContext 持有一个 BeanFactory",这里解释了第一行 * ApplicationContext 继承了 ResourceLoader、ApplicationEventPublisher、MessageSource * 所以对于这几个,可以赋值为 this,注意 this 是一个 ApplicationContext * 那这里怎么没看到为 MessageSource 赋值呢?那是因为 MessageSource 被注册成为了一个普通的 bean */ beanFactory.registerResolvableDependency(BeanFactory.class, beanFactory); beanFactory.registerResolvableDependency(ResourceLoader.class, this); beanFactory.registerResolvableDependency(ApplicationEventPublisher.class, this); beanFactory.registerResolvableDependency(ApplicationContext.class, this); // 这个 BeanPostProcessor 也很简单,在 bean 实例化后,如果是 ApplicationListener 的子类, // 那么将其添加到 listener 列表中,可以理解成:注册事件监听器 beanFactory.addBeanPostProcessor(new ApplicationListenerDetector(this)); //先不管这一句 if (beanFactory.containsBean("loadTimeWeaver")) { beanFactory.addBeanPostProcessor(new LoadTimeWeaverAwareProcessor(beanFactory)); beanFactory.setTempClassLoader(new ContextTypeMatchClassLoader(beanFactory.getBeanClassLoader())); } //个人理解:以下提现了spring的思想,一切东西要注入到bean工厂,这样以后处理过程中,获取方式,使用方式将统一 // 如果没有定义 "environment" 这个 bean,那么 Spring 会 "手动" 注册一个 if (!beanFactory.containsLocalBean("environment")) { beanFactory.registerSingleton("environment", this.getEnvironment()); } // 如果没有定义 "systemProperties" 这个 bean,那么 Spring 会 "手动" 注册一个 if (!beanFactory.containsLocalBean("systemProperties")) { beanFactory.registerSingleton("systemProperties", this.getEnvironment().getSystemProperties()); } // 如果没有定义 "systemEnvironment" 这个 bean,那么 Spring 会 "手动" 注册一个 if (!beanFactory.containsLocalBean("systemEnvironment")) { beanFactory.registerSingleton("systemEnvironment", this.getEnvironment().getSystemEnvironment()); } }
软件开发
2020-07-21 15:55:00
obtainFreshBeanFactory
前提说明在springboot的createApplicationContext的时候,BeanFactory已经创建完成了 protected ConfigurableListableBeanFactory obtainFreshBeanFactory() { this.refreshBeanFactory(); return this.getBeanFactory(); }
refreshBeanFactory是一个抽象方法,spring中两个类实现了这个方法 AbstractRefreshableApplicationContext GenericApplicationContext
会走哪一个呢,看看类继承图吧 因为springboot创建的是一个AnnotationConfigServletWebServerApplicationContext 他的架构图如下 //-----
GenericApplicationContext和AbstractRefreshableApplicationContext
这里简要说明一下这两个类 AbstractApplicationContext是对ApplicationContext的一个简单抽象实现。 AbstractApplicationContext有两大子类GenericApplicationContext和AbstractRefreshableApplicationContext。 GenericApplictionContext 及其子类持有一个单例的固定的DefaultListableBeanFactory实例,在创建GenericApplicationContext实例的时候就会创建DefaultListableBeanFactory实例。 固定的意思就是说,即使调用refresh方法,也不会重新创建BeanFactory实例。 AbstractRefreshableApplicationContext 它实现了所谓的热刷新功能,它内部也持有一个DefaultListableBeanFactory实例,每次刷新refresh()时都会销毁当前的BeanFactory实例并重新创建DefaultListableBeanFactory实例。
1. refreshBeanFactory
GenericApplicationContext protected final void refreshBeanFactory() throws IllegalStateException { if (!this.refreshed.compareAndSet(false, true)) { throw new IllegalStateException("GenericApplicationContext does not support multiple refresh attempts: just call 'refresh' once"); } else { this.beanFactory.setSerializationId(this.getId()); } }
很简单设置个id而已 从这里也可以明显看出 不允许重新刷新
2.getBeanFactory public final ConfigurableListableBeanFactory getBeanFactory() { return this.beanFactory; }
也很简单,返回了自己
软件开发
2020-07-21 15:54:00
在实际开发中,经常需要排查一条消息是否成功发送到底层MQ中,或者查看MQ中消息的内容,以及如何将消息发送给指定的/所有的消费者组重新消费。本文对RocketMQ提供到的查询机制和背后原理进行深入的介绍。文章主要包括4个部分: 消息查询介绍:介绍消息查询中使用到的Message Key 、Unique Key、Message Id 的区别 消息查询工具:分别介绍命令行工具、管理平台、客户端API这三种工具的详细用法,以及如何让消费者重新消费特定的消息。 核心实现原理:介绍Message Key & Unique Key与Message Id的实现机制上区别,Unique Key在Exactly Once语义下的作用,以及为什么Message Id查询效率更高。 索引机制:介绍Message Key & Unique Key底层使用的哈希索引机制

1 消息查询介绍
RocketMQ提供了3种消息查询方式: 按照Message Key 查询:消息的key是业务开发同学在发送消息之前自行指定的,通常会把具有业务含义,区分度高的字段作为消息的key,如用户id,订单id等。 按照Unique Key查询:除了业务开发同学明确的指定消息中的key,RocketMQ生产者客户端在发送发送消息之前,会自动生成一个UNIQ_KEY,设置到消息的属性中,从逻辑上唯一代表一条消息。 按照Message Id 查询:Message Id 是消息发送后,在Broker端生成的,其包含了Broker的地址,和在CommitLog中的偏移信息,并会将Message Id作为发送结果的一部分进行返回。Message Id中属于精确匹配,可以唯一定位一条消息,不需要使用哈希索引机制,查询效率更高。
RocketMQ有意弱化Unique Key与Message Id的区别,对外都称之为Message Id。在通过RocketMQ的命令行工具或管理平台进行查询时,二者可以通用。在根据Unique Key进行查询时,本身是有可能查询到多条消息的,但是查询工具会进行过滤,只会返回一条消息。种种情况导致很多RocketMQ的用户,并未能很好对二者进行区分。
业务开发同学在使用RocketMQ时,应该养成良好的习惯,在发送/消费消息时,将这些信息记录下来,通常是记录到日志文件中,以便在出现问题时进行排查。
以生产者在发送消息为例,通常由以下3步组成: //1 构建消息对象Message Message msg = new Message(); msg.setTopic("TopicA"); msg.setKeys("Key1"); msg.setBody("message body".getBytes()); try{ //2 发送消息 SendResult result = producer.send(msg); //3 打印发送结果 System.out.println(result); }catch (Exception e){ e.printStackTrace(); }
第1步:构建消息
构建消息对象Message,在这里我们通过setKeys方法设置消息的key,如果有多个key可以使用空格" "进行分割
第2步:发送消息
发送消息,会返回一个SendResult对象表示消息发送结果。
第3步:打印发送结果
结果中包含Unique Key和Message Id,如下所示: SendResult [ sendStatus=SEND_OK, msgId=C0A801030D4B18B4AAC247DE4A0D0000, offsetMsgId=C0A8010300002A9F000000000007BEE9, messageQueue=MessageQueue [topic=TopicA, brokerName=broker-a, queueId=0], queueOffset=0]
其中: sendStatus:表示消息发送结果的状态 msgId:注意这里的命名虽然是msgId,但实际上其是Unique Key offsetMsgId:Broker返回的Message ID 。在后文中,未进行特殊说明的情况下,Message ID总是表示offsetMsgId。 messageQueue:消息发送到了哪个的队列,如上图显示发送到broker-a的第0个的队列 queueOffset:消息在队列中的偏移量,每次发送到一个队列时,offset+1
事实上,用户主动设置的Key以及客户端自动生成的Unique Key,最终都会设置到Message对象的properties属性中,如下图所示:

其中: KEYS:表示用户通过setKeys方法设置的消息key, UNIQ_KEY:表示消息发送之前由RocketMQ客户端自动生成的Unique Key。细心的读者发现了其值与上述打印SendResult结果中的msgId字段的值是一样的,这验证了前面所说的msgId表示的实际上就是Unique Key的说法。
在了解如何主动设置Key,以及如何获取RocketMQ自动生成的Unique Key和Message Id后,就可以利用一些工具来进行查询。

2 消息查询工具
RocketMQ提供了3种方式来根据Message Key、Unique Key、Message Id来查询消息,包括: 命令行工具:主要是运维同学使用 管理平台:运维和开发同学都可以使用 客户端API:主要是开发同学使用
这些工具除了可以查询某条消息的内容,还支持将查询到的历史消息让消费者重新进行消费,下面分别进行讲述。
2.1 命令行工具
RocketMQ自带的mqadmin命令行工具提供了一些子命令,用于查询消息,如下: $ sh bin/mqadmin The most commonly used mqadmin commands are: ... queryMsgById 按照Message Id查询消息 queryMsgByKey 按照Key查询消息 queryMsgByUniqueKey 按照UNIQ_KEY查询消息 ...
此外,还有一个queryMsgByOffset子命令,不在本文讲述范畴内
2.1.1 按照Message Key查询
mqadmin工具的queryMsgByKey子命令提供了根据key进行查询消息的功能。注意,由于一个key可能对应多条消息,查询结果只会展示出这些消息对应的Unique Key,需要根据Unique Key再次进行查询。
queryMsgByKey子命令使用方法如下所示: $ sh bin/mqadmin queryMsgByKey -h usage: mqadmin queryMsgByKey [-h] -k [-n ] -t -h,--help 打印帮助信息 -k,--msgKey 指定消息的key,必须提供 -n,--namesrvAddr 指定nameserver地址 -t,--topic 指定topic,必须提供
例如,要查询在TopicA中,key为Key1的消息 $ sh bin/mqadmin queryMsgByKey -k Key1 -t TopicA -n localhost:9876 #Message ID #QID #Offset C0A80103515618B4AAC2429A6E970000 0 0 C0A80103511B18B4AAC24296D2CB0000 0 0 C0A8010354C418B4AAC242A281360000 1 0 C0A8010354C718B4AAC242A2B5340000 1 1
这里,我们看到输出结果中包含了4条记录。其中: Message ID列:这里这一列的名字显示有问题,实际上其代表的是Unique Key QID列:表示队列的ID,注意在RocketMQ中唯一地位一个队列需要topic+brokerName+queueId。这里只显示了queueId,其实并不能知道在哪个Broker上。 Offset:消息在在队列中的偏移量
在查询到Unique Key之后,我们就可以使用另外一个命令:queryMsgByUniqueKey,来查询消息的具体内容。
2.1.2 按照Unique Key查询
mqadmin工具的 queryMsgByUniqueKey 的子命令有2个功能: 根据Unique Key查询消息,并展示结果 让消费者重新消费Unique Key对应的消息
我们将分别进行讲述。queryMsgByUniqueKey子命令的使用方式如下: $ sh bin/mqadmin queryMsgByUniqueKey -h usage: mqadmin queryMsgByUniqueKey [-d ] [-g ] [-h] -i [-n ] -t -d,--clientId 消费者 client id -g,--consumerGroup 消费者组名称 -h,--help 打印帮助信息 -i,--msgId 消息的Unique Key,或者Message Id -n,--namesrvAddr NameServer地址 -t,--topic 消息所属的Topic,必须提供
这里对-i 参数进行下特殊说明,其即可接受Unique Key,即SendResult中的msgId字段;也可以接受Message Id,即SendResult中的offsetMsgId字段。
根据Unique Key查询消息:
通过-i 参数指定Unique Key,通过-t 参数指定topic,如: $ sh bin/mqadmin queryMsgByUniqueKey -i C0A80103511B18B4AAC24296D2CB0000 -t TopicA -n localhost:9876 Topic: TopicA Tags: [null] Keys: [Key1] Queue ID: 0 Queue Offset: 0 CommitLog Offset: 507625 Reconsume Times: 0 Born Timestamp: 2019-12-13 22:19:40,619 Store Timestamp: 2019-12-13 22:19:40,631 Born Host: 192.168.1.3:53974 Store Host: 192.168.1.3:10911 System Flag: 0 Properties: {KEYS=Key1, UNIQ_KEY=C0A80103511B18B4AAC24296D2CB0000, WAIT=true} Message Body Path: /tmp/rocketmq/msgbodys/C0A80103511B18B4AAC24296D2CB0000
对于消息体的内容,会存储到Message Body Path字段指定到的路径中。可通过cat命令查看(仅适用于消息体是字符串): $ cat /tmp/rocketmq/msgbodys/C0A80103511B18B4AAC24296D2CB0000 message body
指定消费者重新消费:
queryMsgByUniqueKey子命令还接收另外两个参数:-g参数用于指定消费者组名称,-d参数指定消费者client id。指定了这两个参数之后,消息将由消费者直接消费,而不是打印在控制台上。
首先,通过consumerStatus命令,查询出消费者组下的client id信息,如: $ sh bin/mqadmin consumerStatus -g group_X -n localhost:9876 001 192.168.1.3@26868 V4_5_0 1576300822831/192.168.1.3@26868 Same subscription in the same group of consumer Rebalance OK
这里显示了消费者组group_X下面只有一个消费者,client id为192.168.1.3@26868。
接着我们可以在queryMsgByUniqueKey子命令中,添加-g和-d参数,如下所示: $ sh bin/mqadmin queryMsgByUniqueKey \ -g group_X \ -d 192.168.1.3@26868 \ -t TopicA \ -i C0A80103511B18B4AAC24296D2CB0000 \ -n localhost:9876 ConsumeMessageDirectlyResult [ order=false, autoCommit=true, consumeResult=CR_SUCCESS, remark=null, spentTimeMills=1]
可以看到,这里并没有打印出消息内容,取而代之的是消息消费的结果。
在内部,主要是分为3个步骤来完成让指定消费者来消费这条消息,如下图所示:
第1步:
命令行工具给所有Broker发起QUERY_MESSAGE请求查询消息,因为并不知道UNIQ_KEY这条消息在哪个Broker上,且最多只会返回一条消息,如果超过1条其他会过滤掉;如果查询不到就直接报错。
第2步:
根据消息中包含了Store Host信息,也就是消息存储在哪个Broker上,接来下命令行工具会直接给这个Broker发起CONSUME_MESSAGE_DIRECTLY请求,这个请求会携带msgId,group和client id的信息
第3步:
Broker接收到这个请求,查询出消息内容后,主动给消费者发送CONSUME_MESSAGE_DIRECTLY通知请求,注意虽然与第2步使用了同一个请求码,但不同的是这个请求中包含了消息体的内容,消费者可直接处理。注意:这里并不是将消息重新发送到Topic中,否则订阅这个Topic的所有消费者组,都会重新消费这条消息。
2.1.3 根据Message Id进行查询
前面讲解生产者发送消息后,返回的SendResult对象包含一个offsetMsgId字段,这也就是我们常规意义上所说的Message Id,我们也可以根据这个字段来查询消息。
根据Message Id查询使用queryMsgById子命令,这个命令有3个作用: 根据Message Id查询消息 通知指定消费者重新消费这条消息,与queryMsgByUniqueKey类似,这里不再介绍 将消息重新发送到Topic中,所有消费者组都将重新消费
queryMsgById子命令用法如下所示: $ sh bin/mqadmin queryMsgById -h usage: mqadmin queryMsgById [-d ] [-g ] [-h] -i [-n ] [-s ] [-u ] -d,--clientId 消费者id -g,--consumerGroup 消费者组名称 -h,--help 打印帮助信息 -i,--msgId Message Id -n,--namesrvAddr Name server 地址 -s,--sendMessage 重新发送消息 -u,--unitName unit name
参数说明如下:
-d和-g参数:类似于queryMsgById命令,用于将消息发送给某个消费者进行重新消费
-i 参数:指定Message Id,即SendResult对象的offsetMsgId字段,多个值使用逗号","分割。
-s参数:是否重新发送消息到Topic。如果同时指定了-d和-g参数,-s参数不生效。
根据Message Id查询消息:
下图根据SendResult的offsetMsgId字段,作为-i参数,来查询一条消息: $ sh bin/mqadmin queryMsgById -i C0A8010300002A9F000000000007BEE9 -n localhost:9876 OffsetID: C0A8010300002A9F000000000007BEE9 OffsetID: C0A8010300002A9F000000000007BEE9 Topic: TopicA Tags: [null] Keys: [Key1] Queue ID: 0 Queue Offset: 0 CommitLog Offset: 507625 Reconsume Times: 0 Born Timestamp: 2019-12-13 22:19:40,619 Store Timestamp: 2019-12-13 22:19:40,631 Born Host: 192.168.1.3:53974 Store Host: 192.168.1.3:10911 System Flag: 0 Properties: {KEYS=Key1, UNIQ_KEY=C0A80103511B18B4AAC24296D2CB0000, WAIT=true} Message Body Path: /tmp/rocketmq/msgbodys/C0A80103511B18B4AAC24296D2CB0000
与queryMsgByUniqueKey子命令输出基本类似,主要是在输出开头多出了OffsetID字段,即offsetMsgId。需要注意的是,queryMsgById不能接受Unqiue Key作为查询参数。
重新发送消息到topic:
在指定-s参数后,消息将重新发送到topic,如下(输出进行了格式化): $ sh bin/mqadmin queryMsgById -i C0A8010300002A9F000000000007BEE9 -n localhost:9876 -s true prepare resend msg. originalMsgId=C0A8010300002A9F000000000007BEE9 SendResult [ sendStatus=SEND_OK, msgId=C0A80103511B18B4AAC24296D2CB0000, offsetMsgId=C0A80103000078BF000000000004D923, messageQueue=MessageQueue [topic=TopicA, brokerName=broker-b, queueId=1], queueOffset=1]
可以看到,这里因为消息是重新发送到了Topic中,因此与我们之前使用生产者发送消息一样,输出的是一个SendResult。在这种情况下,订阅这个Topic的所有消费者组都会重新消费到这条消息。
在实际开发中,如果多个消费者组订阅了某个Topic的消息,如果所有的消费者都希望重新消费,那么就应该使用-s参数。如果只是某个消费者希望重新消费,那么应该指定-g和-d参数。
另外,我们看到发送前打印的originalMsgId和发送后SendResult中的offsetMsgId值并不一样,这是因为消息发送到Topic重新进行了存储,因此值不相同。这也是为什么我们说Message Id可以唯一对应一条消息的原因。
而输出的SendResult结果中的msgId,即Unique Key,并没有发生变化,因此尽管名字是Unique Key,但是实际上还是有可能对应多条消息的。而前面根据queryMsgByUniqueKey查询之所以只有一条消息,实际上是进行了过滤。
2.2 管理平台
RocketMQ提供的命令行工具,虽然功能强大,一般是运维同学使用较多。通过RocketMQ提供的管理平台进来行消息查询,则对业务开发同学更加友好。在管理平台的消息一栏,有3个TAB,分别用于:根据Topic时间范围查询、Message Key查询、Message Id查询,下面分别进行介绍。
根据Topic时间范围查询:
按 Topic 查询属于范围查询,不推荐使用,因为时间范围内消息很多,不具备区分度。查询时,尽可能设置最为精确的时间区间,以便缩小查询范围,提高速度。最多返回2000条数据。
根据Message Key查询:
按 Message Key 查询属于模糊查询,仅适用于没有记录 Message ID 但是设置了具有区分度的 Message Key的情况。 目前,根据Message Key查询,有一个很大局限性:不能指定时间范围,且最多返回64条数据。如果用户指定的key重复率比较高的话,就有可能搜不到。
根据Message Id查询:
按 Message ID 查询属于精确查询,速度快,精确匹配,只会返回一条结果,推荐使用。在这里,传入Unique Key,offsetMsgId都可以。
查看消息详情:
在按照Topic 时间范围查询,按照Message Key查询,结果列表有一个Message Detail按钮,点击可以看到消息详情:包括消息key、tag、生成时间,消息体内容等。在详情页面,也可以将消息直接发送给某个消费者组进行重新消费。
需要注意的是,在消息体展示的时候,只能将消息体转换成字符串进行展示,如果消息的内容是protobuf、thrift、hessian编码的,那么将显示一堆乱码。
如果公司内部有统一的IDL/Schema管理平台,则可以解决这个问题,通过为每个Topic关联一个IDL,在消息展示时,可以根据IDL反序列化后在进行展示。
2.3 客户端API
除了通过命令行工具和管理平台,还可以通过客户端API的方式来进行查询,这其实是最本质的方式,命令行工具和管理平台的查询功能都是基于此实现。 在org.apache.rocketmq.client.MQAdmin接口中,定义了以下几个方法用于消息查询: //msgId参数:仅接收SendResult中的offsetMsgId,返回单条消息 MessageExt viewMessage(final String msgId) //msgId参数:传入SendResult中的offsetMsgId、msgId都可以,返回单条消息 MessageExt viewMessage(String topic,String msgId) //在指定topic下,根据key进行查询,并指定最大返回条数,以及开始和结束时间 QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,final long end)
对于MQAdmin接口,可能部分同学比较陌生。不过我们常用的DefaultMQProducer、DefaultMQPushConsumer等,都实现了此接口,因此都具备消息查询的能力,如下所示:
对于命令行工具,底层实际上是基于MQAdminExt接口的实现来完成的。
细心的读者会问,相同的查询功能在在多处实现是不是太麻烦了?事实上,这只是对外暴露的接口,在内部,实际上都是基于MQAdminImpl这个类来完成的。
viewMessage方法:
两种viewMessage方法重载形式,都只会返回单条消息。下面以生产者搜索为例,讲解如何使用API进行查询: //初始化Producer DefaultMQProducer producer = new DefaultMQProducer(); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); //根据UniqueKey查询 String uniqueKey = "C0A8010354C418B4AAC242A281360000"; MessageExt msg = producer.viewMessage("TopicA", uniqueKey); //打印结果:这里仅输出Unique Key与offsetMsgId MessageClientExt msgExt= (MessageClientExt) msg; System.out.println("Unique Key:"+msgExt.getMsgId()//即UNIQUE_KEY +"\noffsetMsgId:"+msgExt.getOffsetMsgId());
输出结果如下: Unique Key:C0A8010354C418B4AAC242A281360000 offsetMsgId:C0A8010300002A9F000000000007BF94
如果我们把offsetMsgId当做方法参数传入,也可以查询到相同的结果。这是因为,在方法内部实际上是分两步进行查询的: 先把参数当做offsetMsgId,即Message Id进行查询 如果失败,再尝试当做Unique Key进行查询。
源码如下所示:
DefaultMQProducer#viewMessage(String,String) @Override public MessageExt viewMessage(String topic, String msgId) {//省略异常声明 try { //1 尝试当做offsetMsgId进行查询 MessageId oldMsgId = MessageDecoder.decodeMessageId(msgId); return this.viewMessage(msgId); } catch (Exception e) { //查询失败直接忽略 } //2 尝试当做UNIQ_KEY进行查询 return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId); }
前面提到,Unique Key只是从逻辑上代表一条消息,实际上在Broker端可能存储了多条,因此在当做Unique Key进行查询时,会进行过滤,只取其中一条。源码如下所示:
MQAdminImpl#queryMessageByUniqKey public MessageExt queryMessageByUniqKey(String topic,String uniqKey) { //根据uniqKey进行查询 QueryResult qr = this.queryMessage(topic, uniqKey, 32, MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE, true); //对查询结果进行过滤,最多只取一条 if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) { return qr.getMessageList().get(0); } else { return null; } }
我们也可以通过另外只接收一个参数的viewMessage方法进行查询,但是需要注意的是,参数只能是offsetMsgId,不能是Unique Key。 String offsetMsgId = "C0A8010300002A9F000000000007BF94"; producer.viewMessage(offsetMsgId);
queryMessage方法:
其是根据消息Key进行查询,这里不再介绍API如何使用。则与前面两种viewMessage方法重载不同,其返回的是一个QueryResult对象,包含了多条消息。
主要是注意这个方法接收时间范围参数,相比较于管理平台更加灵活。管理平台按照消息Key查询,默认最多返回64条消息,且不能支持指定时间范围,如果消息Key重复度较高,那么可能有些消息搜索不到。如果是在指定时间范围内返回64条消息,如果没有发现想找到的消息,再选择其他时间范围,则可以规避这个问题。

3 实现原理
Unqiue Key & Message Key都需要利用RocketMQ的哈希索引机制来完成消息查询,由于建立索引有一定的开销,因此Broker端提供了相关配置项来控制是否开启索引。关于RocketMQ索引机制将在后面的文章进行详细的介绍。
Message Id是在Broker端生成的,其包含了Broker地址和commit Log offset信息,可以精确匹配一条消息,查询消息更好。下面分别介绍 Unqiue Key & Message Id的生成和作用。
3.1 Unique Key的生成与作用
3.1.1 Unique Key生成
Unique Key是生产者发送消息之前,由RocketMQ 客户端自动生成的,具体来说,RocketMQ发送消息之前,最终都要通过以下方法:
DefaultMQProducerImpl#sendKernelImpl private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) {//省略异常声明 //...略 try { //如果不是批量消息,则生成Unique Key if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } //...略
如上所示,如果不是批量消息,会通过 MessageClientIDSetter 的 setUniqID 方法为消息设置Unique key,该方法实现如下所示:
MessageClientIDSetter#setUniqID public static void setUniqID(final Message msg) { // Unique Key不为空的情况下,才进行设置 if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,) == null) { msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,, createUniqID()); } }

如果消息的Unique Key属性为null,就通过createUniqID()方法为消息创建一个新的Unique Key,并设置到消息属性中。之所以要判断Unique Key是否为null与其作用有关。
3.1.2 Unique Key作用
了解Unique Key的作用对于我们理解消息重复的原因有很大的帮助。RocketMQ并不保证消息投递过程中的Exactly Once语义,即消息只会被精确消费一次,需要消费者自己做幂等。而通常导致消息重复消费的原因,主要包括: 生产者发送时消息重复:RocketMQ对于无序消息发送失败,默认会重试2次。对于有序消息和普通有序消息为什么不进行重试,可参考: RocketMQ NameServer详解 消费者Rebalance时消息重复:这里不做介绍,可参考 RocketMQ Rebalance机制详解
导致生产者发送重复消息的原因可能是:一条消息已被成功发送到服务端并完成持久化,由于网络超时此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败,此时生产者将再次尝试发送消息。
在重试发送时,sendKernelImpl会被重复调用,意味着setUniqID方法会被重复调用,不过由于setUniqID方法实现中进行判空处理,因此重复设置Unique Key。在这种情况下,消费者后续会收到两条内容相同并且 Unique Key 也相同的消息(offsetMsgId不同,因为对Broker来说存储了多次)。
那么消费者如何判断,消费重复是因为重复发送还是Rebalance导致的重复消费呢?
消费者实现MessageListener接口监听到的消息类型是MessageExt,可以将其强制转换为MessageClientExt,之后调用getMsgId方法获取Unique Key,调用getOffsetMsgId获得Message Id。如果多消息的Unique Key相同,但是offsetMsgId不同,则有可能是因为重复发送导致。
3.1.3 批量发送模式下的Unique Key
DefaultMQProducer提供了批量发送消息的接口: public SendResult send(Collection msgs)
在内部,这批消息首先会被构建成一个MessageBatch对象。在前面sendKernelImpl方法中我们也看到了,对于MessageBatch对象,并不会设置Unique Key。这是因为在将批量消息转换成MessageBatch时,已经设置过了。
可能有一部分同学会误以为一个批量消息中每条消息Unique Key是相同的,其实不然,每条消息Unique Key都不同。
这里通过一个批量发送案例进行说明: //构建批量消息 ArrayList msgs = new ArrayList<>(); Message msg1 = new Message("Topic_S",("message3").getBytes()); Message msg2 = new Message("Topic_S",("message4").getBytes()); msgs.add(msg1); msgs.add(msg2); //发送 SendResult result = producer.send(msgs); //打印 System.out.println(result);
输出如下所示: SendResult [sendStatus=SEND_OK, msgId=C0A80103583618B4AAC24CDC29F10000,C0A80103583618B4AAC24CDC29F10001, offsetMsgId=C0A80103000051AF00000000000B05BD,C0A80103000051AF00000000000B065B, messageQueue=MessageQueue [topic=Topic_S, brokerName=broker-c, queueId=2], queueOffset=3]
可以看到,此时输出的msgId(即Unique Key)和offsetMsgId都会包含多个值。客户端给批量消息中每条消息设置不同的Unqiue Key,可以参考DefaultMQProducer#batch方法源码: private MessageBatch batch(Collection msgs) throws MQClientException { MessageBatch msgBatch; try { //1 将消息集合转换为MessageBatch msgBatch = MessageBatch.generateFromList(msgs); //2 迭代每个消息,逐一设置Unique Key for (Message message : msgBatch) { Validators.checkMessage(message, this); MessageClientIDSetter.setUniqID(message); } //3 设置批量消息的消息体 msgBatch.setBody(msgBatch.encode()); } catch (Exception e) { throw new MQClientException("Failed to initiate the MessageBatch", e); } return msgBatch; }

3.2 Message Id生成
SendResult中的offsetMsgId,即常规意义上我们所说的Message Id是在Broker端生成的,用于唯一标识一条消息,在根据Message Id查询的情况下,最多只能查询到一条消息。Message Id总共 16 字节,包含消息存储主机地址,消息 Commit Log offset。如下图所示:
RocketMQ内部通过一个 MessageId 对象进行表示: public class MessageId { private SocketAddress address; //broker地址 private long offset; //commit log offset
并提供了一个 MessageDecoder 对象来创建或者解码MessageId。 public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) public static MessageId decodeMessageId(final String msgId)
Broker端在顺序存储消息时,首先会通过createMessageId方法创建msgId。源码如下所示:
CommitLog.DefaultAppendMessageCallback#doAppend public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,final MessageExtBrokerInner msgInner) { //1 PHY OFFSET:即Commit Log Offset 或者称之为msgOffsetId long wroteOffset = fileFromOffset + byteBuffer.position(); //2 hostHolder用于维护broker地址信息 this.resetByteBuffer(hostHolder, 8); //3 创建msgOffsetId String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
而客户端在根据msgId向Broker查询消息时,首先会将通过MessageDecoder的decodeMessageId方法,之后直接向这个broker进行查询指定位置的消息。
参见:MQAdminImpl#viewMessage public MessageExt viewMessage(String msgId) {//省略异常声明 //1 根据msgId解码成MessageId对象 MessageId messageId = null; try { messageId = MessageDecoder.decodeMessageId(msgId); } catch (Exception e) { throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message."); } //2 根据MessageId中的Broker地址和commit log offset信息进行查询 return this.mQClientFactory.getMQClientAPIImpl().viewMessage( RemotingUtil.socketAddress2String(messageId.getAddress()), messageId.getOffset(), timeoutMillis); }
由于根据Message Id进行查询,实际上是直接从特定Broker的CommitLog中的指定位置进行查询的,属于精确匹配,并不像用户设置的key,或者Unique Key那么样,需要使用到哈希索引机制,因此效率很高。

4 总结 RocketMQ提供了3种消息查询方式:Message Key & Unique Key & Message Id RocketMQ提供了3种消息查询工具:命令行、管理平台、客户端API,且支持将查询到让特定/所有消费者组重新消费 RocketMQ有意对用户屏蔽Unique Key & Message Id区别,很多地方二者可以通用 Message Key & Unique Key 需要使用到哈希索引机制,有额外的索引维护成本 Message Id由Broker和commit log offset组成,属于精确匹配,查询效率更好
免费学习视频欢迎关注云图智联: https://e.yuntuzhilian.com/
软件开发
2020-07-21 15:12:00
程序开始启动
当系统上电后根据BOOT的引导配置选择启动方式,默认是Flash启动,这时系统开始把所有的代码段搬到RAM中去运行
CPU就从内存中(RAM)获取数据和指令,根据相关指令来控制系统运行
数据存放位置:除了特定IO操作存到EEPROM里面,其他变量的使用全在RAM区,其中就有堆和栈。堆是由用户手动分配malloc,在整个程序运行期间都有效除非手动释放free。而栈在一个函数体内存在,例如main方法由系统自动创建,如果出现递归创建则很容易导致栈溢出,当系统执行完该函数功能后,栈空间数据也由系统自动销毁。

ROM和RAM数据比较
堆和栈的实际定义大小可查看,在.map文件中
总结 内存大小直接体现该系统所能同时运行的任务数,而 所有系统运行中产生的数据存放位置全都在RAM区 。除了读写磁盘
软件开发
2020-07-21 15:07:00
关于新的activiti新团队与原有的团队重要开发人员我们罗列一下,细节如下:
Tijs Rademakers,算是activiti5以及6比较核心的leader了。现在是flowable框架的leader。
Joram Barrez 算是activiti5以及6比较核心的leader了。目前从事flowable框架开发。
Salaboy Activiti Cloud BPM leader(Activiti Cloud BPM 也就是目前的activiti7框架)
Tijs Rademakers以及Salaboy目前是两个框架的leader。
特此强调一点:activiti5以及activiti6、flowable是Tijs Rademakers团队开发的。
Activiti7是 Salaboy团队开发的。activiti6以及activiti5代码目前有 Salaboy团队进行维护。因为Tijs Rademakers团队去开发flowable框架了,所以activiti6以及activiti5代码已经交接给了 Salaboy团队(可以理解为离职之前工作交接)。目前的activiti5以及activiti6代码还是原Tijs Rademakers原有团队开发的。Salaboy团队目前在开发activiti7框架。对于activiti6以及activiti5的代码官方已经宣称暂停维护了。activiti7就是噱头 内核使用的还是activiti6。并没有为引擎注入更多的新特性,只是在activiti之外的上层封装了一些应用。
注意:activiti6的很多框架bug在flowable框架中已经修复的差不多了。
activiti5以及ativiti6的核心开发团队是Tijs Rademakers团队。activiti6最终版本由Salaboy团队发布的。
activiti6核心代码是Tijs Rademakers团队开发的,为何是Salaboy团队发布的呢?很简单,因为这个时候Tijs Rademakers团队已经去开发flowable去了。flowable是基于activiti-6.0.0.Beta4 分支开发的。下面我们截图一些flowable的发展。
目前Flowable已经修复了activiti6很多的bug,可以实现零成本从activiti迁移到flowable。
flowable目前已经支持加签、动态增加实例中的节点、支持cmmn、dmn规范。这些都是activiti6目前版本没有的。
1、flowable已经支持所有的历史数据使用mongdb存储,activiti没有。
2、flowable支持事务子流程,activiti没有。
3、flowable支持多实例加签、减签,activiti没有。
4、flowable支持httpTask等新的类型节点,activiti没有。
5、flowable支持在流程中动态添加任务节点,activiti没有。
6、flowable支持历史任务数据通过消息中间件发送,activiti没有。
7、flowable支持java11,activiti没有。
8、flowable支持动态脚本,,activiti没有。
9、flowable支持条件表达式中自定义juel函数,activiti没有。
10、flowable支持cmmn规范,activiti没有。
11、flowable修复了dmn规范设计器,activit用的dmn设计器还是旧的框架,bug太多。
12、flowable屏蔽了pvm,activiti6也屏蔽了pvm(因为6版本官方提供了加签功能,发现pvm设计的过于臃肿,索性直接移除,这样加签实现起来更简洁、事实确实如此,如果需要获取节点、连线等信息可以使用bpmnmodel替代)。工作流框架项目源码:www.1b23.com
13、flowable与activiti提供了新的事务监听器。activiti5版本只有事件监听器、任务监听器、执行监听器。
14、flowable对activiti的代码大量的进行了重构。
15、activiti以及flowable支持的数据库有h2、hsql、mysql、oracle、postgres、mssql、db2。其他数据库不支持的。使用国产数据库的可能有点失望了,需要修改源码了。
16、flowable支持jms、rabbitmq、mongodb方式处理历史数据,activiti没有。
软件开发
2020-07-21 14:22:06
大家好啊,今天栈长给大家分享下我的开发历程中,我知道的那些被淘汰的技术或者框架,有些我甚至都没有用过,但我知道它曾经风光过。
废话不多说,下面我要开始吹了……
1、Swing
下面这个是用 swing 开发的:
图来源网络,有没有似曾相识的感觉?懂的自然懂!
栈长去年中秋也用过 swing:
这个中秋,我用 Java 画了一个月饼!
Swing 算是 Java 早期代替 AWT 的桌面应用 GUI 开发工具包,一个听到就已经淘汰的技术,给我的感觉就是丑丑丑!现在与 AWT 一起在时间这个长河里长眠。
如果 Java GUI 库发展历程分为三代,可以是: AWT > SWING > JAVAFX
随着 JavaFx 的发布,加速 SWING 的被淘汰。下面这个是用 JavaFx 开发的:
图来源:zhihu.com/question/54498643/answer/271632290
现在 JavaFx 也有十来年了,虽然这篇帖子也在说 JavaFx 淘汰了的,只是现在桌面应用不是主流吧,我也没用过不敢乱说,JavaFx 在桌面应用开发应该还是有一席之地的。
2、JSF JSF:Java Server Faces
JSF是一种用于构建 Java Web 应用程序的表现层框架,和 Struts 一样性质的框架。
图来源: https://javabeat.net/jsf-2/
国内用 JSF 的比较少,有也是老系统了,国外应该还有用 JSF 的,不过随着 Spring MVC, Spring Boot 的横空出世,JSF 应该也是过时的技术了。
3、EJB
EJB也是个神器,只见其影,未见其身。前些年,在网上各个面试题还有它的身影,现在估计很难见到了。 EJB:Enterprise Java Beans,即:企业Java Beans
Sun公司发布的文档中对 EJB 的定义是:EJB 是用于开发和部署多层结构的、分布式的、面向对象的 Java 应用系统的跨平台的构件体系结构。
简单来说,EJB就是部署分布式系统用的,就是把A程序放在服务器上,通过B客户端来调用,并且是跨平台的。
图来源:oreilly.com
因为 EJB 过于复杂和笨重,调试非常麻烦,现在都被轻量级的 RPC 框架(Dubbo)及轻量级 Restful 形式的分布式框架 (Spring Cloud) 替代了。关注微信公众号Java技术栈在后台回复分布式可以获取分布式架构系列教程。
4、JSP
JSP 全称:Java Server Pages,是由早期的 Sun 公司发布的一种动态网页开发技术,即在 HTML 网页代码中嵌入 JSP 标签的 Java 代码实现动态网页。
JSP 代码示例: 微信公众号Java技术栈 <% out.println("Hello, Java技术栈!"); %>
这个示例只是简单的调用 JSP 的内置 out 对象在页面输出展示一句话。
JSP 的本质其实就是 Servlet,JSP 文件被编译之后,就变成了 Servlet Java 类文件,因为 JVM 虚拟机只能识别 Java 字节码文件,而不能识别 JSP 文件。
在 JSP 的时代,那时候还没有前后端分离的说法,JSP 可以包揽全部,即实现静态页面,又实现动态代码逻辑,全部都在一个 JSP 文件里面。这样,一个程序员既是前端,又是后端。
但是,现如今在前后端分离的热潮下,前后端分工明确,后端只负责业务逻辑的接口开发,前端负责调用后端接口再做页面数据封装展示,JSP 几乎是被淘汰了。
虽然 JSP 是被前后端分离取代了,但并不说明 JSP 没有用了,不是所有系统都是前后端分离的,比如说一个只有两三个页面的动态系统,JSP、Servlet足以搞定,你总不能上页面模板引擎、各种框架,或者再上前后端分离吧?
5、Struts
Struts2 那些年可谓是风光无限啊,Struts2 + Spring + Hibernate 三大框架一起组成了 " SSH "————牛逼哄哄的 Java Web 框架三剑客。
Struts 这篇就不多说了,具体看这篇: Struts2 为什么被淘汰?
6、Memcached
Redis 这几年的大热,现在已经替代 Memcached 成为缓存技术的首要中间件,作为大厂的带头兵,在 BAT 里面,Redis 也已经逐渐取代了 Memcached,广泛使用 Redis 作为缓存应用方案。
为什么 Redis 能后来居上呢?关注微信公众号Java技术栈在后台回复redis可以获取 Redis 系列教程、
1)速度更快
Memcached 使用的是多线程模型,既然是多线程,就会因为全局加锁而带来性能损耗。而 Redis 使用的是单线程模型,没有锁竞争,速度非常快。
相关阅读: Redis 到底是单线程还是多线程?
2)数据类型更丰富
Memcached 数据类型非常单一,只支持 String 数据类型,在业务实现上就非常有瓶颈。
而 Redis 支持 string(字符串)、hash(哈希)、list(列表)、set(集合)、zset(sorted set:有序集合) 等……丰富的数据类型可以让 Redis 在业务上大展拳脚。
这也是 Redis 能代替 Memcached 最重要的原因之一。
相关阅读: Redis 的 8 大应用场景!
并且,Memcached 值最大上限为:1M,而 Redis 最大可以到:1GB。
3)数据持久化
Memcached 不支持持久化,Redis 支持。
缓存服务器断电后,Memcached 的数据是不能恢复的,而 Redis 可以将数据保久化在磁盘中,服务器重启的后可以加载再次使用,不会造成数据断电丢失。
比如,有些数据是直接放在缓存数据库中的,其他地方可能没有备份,如果丢失了,那可能会造成业务影响,这也是 Redis 非常有用的一个保障特性。
总结
好了,今天栈长列举了 6 个经典的即将被淘汰的技术或框架,虽然这些技术现在面临淘汰,但它们曾经也风光过,值得敬畏。
另外,虽然这些技术要被淘汰了,但不说明它们没有用了,它们依然在被运用,只是现在不是主流了。
最后,在大家的开发历程中,你都遇到过哪些曾经很风光,但现在即将被淘汰的技术呢?欢迎大家留言分享讨论~
关注公众号Java技术栈回复"面试"获取我整理的2020最全面试题及答案。
推荐去我的博客阅读更多:
1. Java JVM、集合、多线程、新特性系列教程
2. Spring MVC、Spring Boot、Spring Cloud 系列教程
3. Maven、Git、Eclipse、Intellij IDEA 系列工具教程
4. Java、后端、架构、阿里巴巴等大厂最新面试题
觉得不错,别忘了点赞+转发哦!
软件开发
2020-07-21 14:17:00
场景
延时消息即消息发送后并不立即对消费者可见,而是在用户指定的时间投递给消费者。比如我们现在发送一条延时1分钟的消息,消息发送后立即发送给服务器,但是服务器在1分钟后才将该消息交给消费者。

这种延时消息有一些什么应用场景呢?比如在电商网站上我们购物的时候,下单后我们没有立即支付,这个时候界面上往往会提醒你如果xx分钟还未支付订单将被取消。对于这么一个功能如果不使用延时消息,那我们就需要使用类似定时任务的功能,比如每分钟我们跑一个定时任务对订单表进行扫描,将未支付订单扫出,如果从下单时间到现在已经超过了45分钟则将该订单取消。但是定时扫描有一个问题是效率不高,如果订单很多将会严重的影响db的性能。如果使用延时消息就没有这样的问题了,只需要发送一条延时xx分钟的的延时消息即可,在消息里携带有订单号,xx分钟后消费者收到该消息检查对应订单状态做出对应处理,这种方式将大大减轻对db的压力,实现起来也更优雅。

上面描述的是一种延时时间固定的场景,还有一些是要指定时间执行。比如买了一张一周后北京去东京的机票,那么在乘机时间到来之前可能要发送数次提醒的短信给用户,那么我们也可以在用户下单后发送一条延时消息,延时到乘机时间之前发送。

需求
有了场景,我们首先来分析一下需求: 延时时间是不固定的,比如我们无法预测用户订未来多久的机票,所以我们不能仅仅提供几种不同延时单位的延时功能。 延时时间精确在秒这个级别就可以了,不需要精确到1秒以内。 最大的延时时间应该有个度。比如最大延时1年或2年(可能有同学问难道不能提供任意最大延时时间么?任意最大延时时间会增加系统的实现的复杂度,而在实际中并没有什么用处,一般我们都尽量不推荐延时太久的消息,因为系统在不断地演变,比如当前设计的时候消息是延时两年,但是两年后系统早已大变样了,两年前的消息都不一定有人记得,更别人说兼容两年前的消息格式了)。
有了上面的限定,我们来讨论一下延时消息的设计。

设计
延时说白了就是一个定时任务的功能,指定一个未来的时间执行消息投递的任务,时间到了再将消息投递出去。
如果遇到定时任务的场景往往会有这么几个方案来考虑: 优先级队列(堆) 比如JAVA里的ScheduledThreadPoolExecutor。定时任务都丢到一个优先级队列里,按照到期时间进行排序,线程池从队列里取任务出来执行,算法复杂度是O(logN)。 扫描 所有任务都放到一个List里,然后一个死循环,比如每100ms执行一次,扫描List里所有任务,当某任务到期后取出执行。这种方式实现简单,算法复杂度是O(N),如果任务太多的话效率会很低,适合任务比较少的场景。 hash wheel 按照任务的到期时间将任务放到一个刻度盘里,比如未来1秒的放到位置1,未来2秒的放到位置2,依次类推。每次刻度盘转一个刻度,转到该可读则将该刻度上所有任务执行,算法复杂度是O(1)。

上面这三种方式都是基于内存的数据结构,也就是我们得将所有任务都放到内存里,如果用在延时消息上,显然是不现实的,实际上也是没有必要的。如果这个消息是几个小时后需要投递,我们为什么需要现在就将其加载进来一直占着内存呢?看起来我们只需要提前一段时间加载未来某段时间需要投递的消息即可。比如我们将消息按照一个小时为一个段,每次只加载一个段的消息到内存里。其实我们可以用一个很形象的比如来描述这种结构:两层时间轮(hash wheel)。第一层hash wheel位于磁盘上,精度较粗,每个小时为1个刻度。第二层hash wheel位于内存里,只包含第一层hash wheel一个刻度的数据,精度为1秒。
但是我们怎么去加载这些需要的消息将其组织为第一层hash wheel呢?消息接收后存储到一个顺序的log文件,消息接收的顺序和消息的延时时间之间是没有任何关系的。比如现在收到了一条消息,是1个小时后需要投递,稍后收到一条消息可能是5分钟之后投递。我们加载时候是按照延时时间进行加载的,比如我们需要加载未来一个小时需要投递的消息:
比如上图所示,3 seconds是最近要投递的消息,然后是5minutes,而排在最头上的是1个小时后要投递的。我们不可能每次要预加载的时候都从头扫描一遍,然后将需要的消息加载。
怎么办呢?对于需要快速查找,我们肯定会想到建立索引。那么我们只需要按照我们的预加载的时间段划分索引即可了,比如我们建立2019021813, 2019021814...这样的索引文件,文件里每一个entry就是一个索引,该索引包含以下结构: index:
schedule time: int64
offset: int64
offset是指向前面log的偏移,而schedule time是消息的到期时间。这样我们每次只需要加载一个段(比如2019021813)的索引到内存就行了,内存中的hashwheel根据schedule time决定到期时间,到期后根据offset读取到消息内容将消息投递出去。
这个存储结构到这里基本上就ok了,但是存在一个落地实施的问题(磁盘的空间是有限的):如果一开始收到一条消息是6个月之后投递的,后面收到了一些一个小时内投递的,实际上只要消息投递后我们就可以将消息删除了,这样可以大大的节约内存空间,但是因为log的头部有一条6个月之后的消息,所以我们还不能将该log删除掉,也就是至少6个月我们不能删除消息,除非我们按照消息来删除,也就是将6个月后的消息保留下来,而一个小时内已经投递了的消息删除掉(一种compact机制),但是这种实现就变得很复杂。
其实换个方式就简单了,在前面我们按照每个时间段建立索引文件,那么如果我们不仅仅建立索引呢?也就是索引文件里不仅仅是索引,而是包括完整的消息:消息收到后先进入一个按照接收顺序的log(qmq里称之为message log),然后回放该log,按照log里每条消息的投递时间将消息放到对应的时间段上(qmq里称之为schedule log),这样只要回放完成后message log里的消息就可以删除了,message log只需要保留很少的内容,而schedule log是按照投递时间段来组织的,已经投递过的时间段也可以立即删除了。通过这种变化我们顺利的解决了磁盘占用问题,另外还有一个副产品:读写分离。我们可以将延时消息里的message log放到小容量高性能的SSD里,提高消息发送的吞吐量和延时,而将schedule log放到大容量低成本的HDD里,可以支撑时间更久的延时消息(下图即延时消息的存储结构):
其他细节
1. Server重启如何发现未投递消息
在这里还有一些具体实现细节需要处理。虽然我们按照每个时间单位重新组织了消息(schedule log),但是在该时间段内的消息并不是按照投递时间排序的。比如每个小时为一个时间段,那么可能第59分钟的消息排在最前面,而几秒内需要投递的排在最后面,那如果某个时间段内的消息正在投递时应用突然挂掉了,那么再次恢复的时候我们并不能准确的知道消息投递到哪儿了。所以我们增加了一个dispatch log,dispatch log在消息投递完成后写入,dispatch log里每一个entry记录的是schedule log里的offset,表示该offset的消息已经投递,当应用重启后我们会对比schedule log和dispatch log,将未投递的消息找出来重新加载投递,dispatch log相当于一个位图数据结构。

2. 正在加载某个时间段内的消息过程中又来了属于该时间段内消息如何处理,会不会重复加载
在我们决定加载某个时间段消息时(正在加载的时间段称之为current loading segment),我们首先会取得该时间段文件的最大offset,然后加载只会加载这个offset范围内的消息(qmq内称之为loading offset),而加载过程中如果又来了该时间段内消息,那这个消息的offset也是>loading offset: if( message.offset in current loading segment && message.offset > loading offset){
add to memory hash wheel
}

3. 加载一个时间段内的消息是不是需要占用太多的内存
实际上我们并不会将schedule log里完整的消息加载到内存,只会加载索引到内存,根据前面的介绍,每个索引是16个字节(实际大小可以参照代码,略有出入)。假设我们使用1G内存加载一个小时索引的话,则可以装载1G/16B = (1024M * 1024K * 1024B)/(16B) = 67108864 条消息索引。则每秒qps可以达到18641(67108864 / 60 / 60)。如果我们想每秒达到10万qps,每个小时一个刻度则需要5493MB,如果觉得内存占用过高,则可以相应的缩小时间段大小,比如10分钟一个时间段,则10万qps只需要占用915MB内存。通过计算可知这种设计方式还是在合理的范围内的。
免费学习视频欢迎关注云图智联: https://e.yuntuzhilian.com/
软件开发
2020-07-21 13:52:00
本文首先会对rocketmq集群四种部署模式进行介绍,包括:单主模式,多master模式,多master多slave模式异步复制,多master多slave模式同步复制,对比各种模式的优缺点。接着将每一种模式部署成一个集群,因此总共有4个集群,由一个NameServer集群进行管理。最后会介绍常见部署错误问题的解决方案。
1 部署模式介绍
一个完整的RocketMQ集群由NameServer集群,Broker集群,Producer集群,Consumer集群组成。本节主要介绍NameServer集群,Broker集群的搭建。
一个NameServer集群可以管理多个Broker集群,每个Broker集群由多组broker复制组构成,多个broker复制组通过指定相同的集群名称,来构成一个Broker集群。
具体来说,每个broker复制组都满足以下几点: 包含一个master节点,零个或者多个slave节点,且这些节点需要指定相同的broker名称;不同的broker复制组的broker名称必须不同。 master和slave通过brokerId参数进行区分。master的 brokerId参数必须是 0,slave 的 brokerId 必须是大于0的整数,如果有多个slave,这些slave的brokerId需要指定为不同的值。 master可读可写,slave只可以读,master通过主从复制的方式将数据同步给slave,支持同步复制和异步复制两种复制方式,目前master宕机后,slave不能自动切换为master。
基于Broker复制组的特性,一个Broker集群通常有多种部署方式:
1. 单个 Master
集群中只有一个broker复制组,且只包含一个master节点。这种方式部署风险较大,一旦 Broker 重启或者宕机时,会导致整个服务不可用,通常是开发调试时使用,不建议线上环境使用
2. 多 Master 模式
集群中有多个broker复制组,且都只有master节点,没有slave节点。例如 2 个 master 或者 3 个 master节点。
优点: 配置简单,单个 Master 宕机或重启维护对应用无影响,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。
缺点: 单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
3 多 Master 多 Slave 模式,异步复制
集群中有多个broker复制组,且每个复制组都有master节点,也有slave节点。例如:每个 master 配置一个 slave。HA 采用异步复制方式,主备有短暂消息延迟,毫秒级。
优点: 即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以 从 Slave 消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
缺点:Master 宕机,磁盘损坏情况,会丢失少量消息。
4. 多 Master 多 Slave 模式,同步复制
与第三种方式类似,不同的是,HA 采用同步复制,生产者发送发送消息时,只有再主备都写成功,才向应用返回成功。
优点: 数据与服务都无单点,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
缺点: 性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT 会略高。
接下来,笔者将将演示在Linux操作系统中如何搭建一个单节点NameServer集群,以及上述四种Broker集群,并由这个单节点的NameServer集群来管理这四个Broker集群。
注意:在实际生产环境中,NameServer以及每个Broker节点(不管是master还是slave),都是部署在不同的机器上的。这里简单起见,将通过伪分布式的方式进行搭建,即所有节点都运行在一台机器上。如果读者希望搭建完整的分布式集群,可以使用vmvare/virtualbox等工具,只需要将本文的配置拷贝即可。

2 前提条件
wRocketMQ NameServer和Broker是基于Java 开发的,需要安装JDK,且需要保证二者版本的匹配。下图列出安装/运行RocketMQ需要的JDK版本。
Version Client Broker NameServer 4.0.0-incubating >=1.7 >=1.8 >=1.8
4.1.0-incubating >=1.6 >=1.8 >=1.8
4.2.0 >=1.6 >=1.8 >=1.8
4.3.x >=1.6 >=1.8 >=1.8
4.4.x
4.5.x 4.6.x
>=1.6
>=1.6 >=1.6
>=1.8
>=1.8 >=1.8
>=1.8
>=1.8 >=1.8
本文以RocketMQ 4.6.0版本为例进行讲解,对应JDK版本为1.8。本文不讲解JDK如何安装,读者可自行查阅相关资料。确保JDK的版本>=1.8,可以通过如下方式验证: $ java -version java version "1.8.0_131" Java(TM) SE Runtime Environment (build 1.8.0_131-b11) Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode

3 下载安装
下载
该地址列出了RocketMQ所有发布的版本: https://github.com/apache/rocketmq/releases
这里将RocketMQ安装到Linux文件系统的 /opt 目录,首先进入/opt目录 cd /opt
可以直接从github下载,但是网速较慢 $ wget https://github.com/apache/rocketmq/archive/rocketmq-all-4.6.0.zip
网速慢的同学也可以从国内镜像下载: $ wget https://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.6.0/rocketmq-all-4.6.0-bin-release.zip
下载之后进行解压缩: $ unzip rocketmq-all-4.6.0-bin-release.zip
解压目录说明 rocketmq-all-4.6.0-bin-release ├── benchmark #性能测试脚本 ├── bin #命令行工具 ├── conf #配置文件目录 ├── lib #依赖的第三方类库 ├── LICENSE ├── NOTICE └── README.md
设置ROKCKET_HOME环境变量:
在这里,将我们将RocketMQ安装包的解压目录设置为ROCKETMQ_HOME环境变量。例如笔者的解压目录为: $ pwd /opt/rocketmq-all-4.6.0-bin-release
为了以后升级方便,我们创建一个软连接: sudo ln -s /opt/rocketmq-all-4.6.0-bin-release rocketmq
修改 /etc/profile ,添加以下一行: export ROCKETMQ_HOME=/opt/rocketmq
执行以下命令,使得环境变量生效 source /etc/profile
验证环境变量生效: $ echo $ROCKETMQ_HOME /opt/rocketmq

3.1 启动NameServer
启动 $ nohup sh bin/mqnamesrv &
验证启动成功 $ jps -l 3961157 sun.tools.jps.Jps 3953057 org.apache.rocketmq.namesrv.NamesrvStartup #NameServer进程
NameServer默认监听 9876 端口,也可以通过如下方式验证: $ lsof -iTCP -nP | grep 9876 java 3953057 tianshouzhi.robin 65u IPv6 134849198 0t0 TCP *:9876 (LISTEN)
设置 NAMESRV_ADDR 环境变量,修改etc/profile,添加以下内容: export NAMESRV_ADDR=localhost:9876
并执行"source /etc/profile"使得其生效
3.2 启动Broker
${ROCKETMQ_HOME}/conf目录下,提供了我们讲解到的RocketMQ四种部署模式的demo配置文件,如下所示: conf ├── 2m-2s-async //多Master多Slave模式,异步复制 │ ├── broker-a.properties │ ├── broker-a-s.properties │ ├── broker-b.properties │ └── broker-b-s.properties ├── 2m-2s-sync //多Master多Slave 模式,同步复制 │ ├── broker-a.properties │ ├── broker-a-s.properties │ ├── broker-b.properties │ └── broker-b-s.properties ├── 2m-noslave //多Master模式 │ ├── broker-a.properties │ ├── broker-b.properties │ └── broker-trace.properties └── broker.conf //单Master模式
在实际生产环境中,你可以选择其中一种模式进行部署。从学习的角度,笔者将详细讲解每一种模式,每种模式部署为一个集群,因此总共会部署4个集群。
另外,生产环境中至少需要部署为双主模式,每个机器只会部署一个broker,因此只使用broker.conf配置文件即可,根据要配置的节点的类型,将其他模式下的配置复制到broker.conf,或者直接修改broker.conf。

3.2.1 单Master模式
修改配置文件:
单master模式可以使用conf目录下的broker.conf 配置文件,内容如下所示: #集群名称 brokerClusterName=single-master #broker复制组名称 brokerName=broker-a #nameserver地址 namesrvAddr=127.0.0.1:9876 #brokerId,因为是master节点,所以这里设置为0 brokerId=0 #监听端口 listenPort=10911 #rocketmq定时清除 deleteWhen=04 #文件保留时间,默认48小时 fileReservedTime=48 #broker角色,异步复制 brokerRole=ASYNC_MASTER #异步刷盘 flushDiskType=ASYNC_FLUSH #存储目录 storePathRootDir=/data/rocketmq/single-master/broker-a/store storePathCommitLog=/data/rocketmq/single-master/broker-a/store/commitlog
注意:如果配置项名称或者值写错,broker启动时并不会报错,会使用默认值替代,常见错误:如在=号两边加了空格,这里是不需要的。
启动通过bin目录下的 mqbroker 脚本。由于默认的配置,启动后会立即占用8G内存,如果机器内存不够,可以修改bin/runbroker.sh,找到以下这一行: JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
将其修改为: JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g"
启动: $ nohup sh bin/mqbroker -c conf/broker.conf &
注意:broker启动时不会读取broker.conf中的配置,尽管也可以启动,但是如果需要使得配置文件生效,必须通过-c参数进行指定。
验证启动成功: $ jps -l 3961157 sun.tools.jps.Jps 3960977 org.apache.rocketmq.broker.BrokerStartup 3953057 org.apache.rocketmq.namesrv.NamesrvStartup
NameServer默认监听在 10911 端口,也可以通过以下方式验证: $ lsof -iTCP -nP | grep 10911 java 37686 tianshouzhi.robin 107u IPv6 137040246 0t0 TCP *:10911 (LISTEN)
如果启动失败,可以通过以下命令查看错误的具体信息: tail -200f ~/logs/rocketmqlogs/broker.log

测试发送/消费消息
安装包bin目录下提供了一个 tools.sh 工具,我们可以通过其来测试发送/接收消息。
测试发送消息:
执行以下命令将会往一个名为 TopicTest 主题中发送1000条消息 $ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer SendResult [sendStatus=SEND_OK, msgId=FDBDDC0300FF00010001022700120225003C3D4EAC696720298203E7, offsetMsgId=AC11000100002A9F0000000000037567, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], wqueueOffset=249] ...
测试消费消息:
执行以下命令,将会之前的消费1000条消息 $ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ConsumeMessageThread_%d Receive New Messages: [MessageExt… ...
这里我们是通过命令行工具来发送/消费消息,在后文中,我们将介绍如何通过API的方式来完成相同的功能。

查看集群列表信息: $ sh bin/mqadmin clusterList -n localhost:9876 #Cluster Name #Broker Name #BID #Addr #Version #...(略) single-master broker-a 0 192.168.1.3:10911 V4_6_0 …
输出的每一列说明如下: Cluster Name:集群的名称,即brokerClusterName配置项的值 Broker Name:Broker的名称,即brokerName配置项的值 BID:Broker的ID,这里显示为0,即brokerId配置项的值 Addr:监听的IP/端口,供生产者/消费者访问,端口即listenPort配置项的值 Version:broker的版本

3.2.2 多Master模式
这里演示的多master模式是双主模式:包含2个master节点,没有slave节点。如前所属,这里是伪分布式,在一台机器上启动两个master节点。我们需要对 conf/2m-noslave 目录下的2个配置文件进行一些修改,否则会与前面搭建的单master模式存在一些冲突,如监听的端口和保存数据的路径等。
修改后的结果如下所示:
conf/2m-noslave/broker-a.properties brokerClusterName=2m-noslave listenPort=11911 namesrvAddr=127.0.0.1:9876 brokerName=2m-broker-a brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH storePathRootDir=/data/rocketmq/2m-noslave/broker-a/store/ storePathCommitLog=/data/rocketmq/2m-noslave/broker-a/store/commitlog/ storePathConsumerQueue=/data/rocketmq/2m-noslave/broker-a/store/consumequeue/
conf/2m-noslave/broker-b.properties brokerClusterName=2m-noslave listenPort=12911 namesrvAddr=127.0.0.1:9876 brokerName=2m-broker-b brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH storePathRootDir=/data/rocketmq/2m-noslave/broker-b/store/ storePathCommitLog=/data/rocketmq/2m-noslave/broker-b/store/commitlog/ storePathConsumerQueue=/data/rocketmq/2m-noslave/broker-b/store/consumequeue/
在这里,我们将两个配置文件中的brokerClusterName都改成了2m-noslave,表名这两个broker节点将组成一个新的集群。也别修改了listenPort配置项以监听不同的端口,此外,我们修改了三个storePath前缀的配置项,将数据存储到不同的目录中。
特别需要注意的是:一些同学可能认为brokerClusterName已经不同了,没有必要修改brokerName配置项,这是一种误解。在RocketMQ中,一个NameServer集群可以多个Broker集群,但是broker集群的名称并没有起到命名空间的作用,因此管理的所有Broker集群下的broker复制组的名称都不能相同。
启动broker-a nohup sh bin/mqbroker -c conf/2m-noslave/broker-a.properties &
启动broker-b nohup sh bin/mqbroker -c conf/2m-noslave/broker-b.properties &
在启动之后,当我们在查看集群列表信息时,如下: $ sh bin/mqadmin clusterList -n localhost:9876 #Cluster Name #Broker Name #BID #Addr #Version single-master broker-a 0 192.168.1.3:10911 V4_6_0 2m-noslave 2m-broker-a 0 192.168.1.3:11911 V4_6_0 2m-noslave 2m-broker-b 0 192.168.1.3:12911 V4_6_0
这里显示了2个broker集群:single-master和2m-noslave,其中后者存在两个节点。

3.2.3 多 Master 多 Slave 模式,异步复制
该模式需要使用conf/2m-2s-async目录下的四个配置文件。同样我们需要修改brokerClusterName,listenPort,brokerName以及存储路径。特别需要注意的是对于slave,其brokerRole配置项需要为SLAVE,brokerId是需要时一个大于0的值。
修改后的结果如下所示:
conf/2m-2s-async/broker-a.properties brokerClusterName=2m-2s-async listenPort=13911 namesrvAddr=127.0.0.1:9876 brokerName=2m-2s-async-broker-a brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH storePathRootDir=/data/rocketmq/2m-2s-async/broker-a-0/store/ storePathCommitLog=/data/rocketmq/2m-2s-async/broker-a-0/store/commitlog/ storePathConsumerQueue=/data/rocketmq/2m-2s-async/broker-a-0/store/consumequeue/
conf/2m-2s-async/broker-a-s.properties brokerClusterName=2m-2s-async listenPort=14911 namesrvAddr=127.0.0.1:9876 brokerName=2m-2s-async-broker-a brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH storePathRootDir=/data/rocketmq/2m-2s-async/broker-a-1/store/ storePathCommitLog=/data/rocketmq/2m-2s-async/broker-a-1/store/commitlog/ storePathConsumerQueue=/data/rocketmq/2m-2s-async/broker-a-1/store/consumequeue/
conf/2m-2s-async/broker-b.properties brokerClusterName=2m-2s-async listenPort=15911 namesrvAddr=127.0.0.1:9876 brokerName=2m-2s-async-broker-b brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH storePathRootDir=/data/rocketmq/2m-2s-async/broker-b-0/store/ storePathCommitLog=/data/rocketmq/2m-2s-async/broker-b-0/store/commitlog/ storePathConsumerQueue=/data/rocketmq/2m-2s-async/broker-b-0/store/consumequeue/
conf/2m-2s-async/broker-b-s.properties brokerClusterName=2m-2s-async listenPort=16911 namesrvAddr=127.0.0.1:9876 brokerName=2m-2s-async-broker-b brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH storePathRootDir=/data/rocketmq/2m-2s-async/broker-b-1/store/ storePathCommitLog=/data/rocketmq/2m-2s-async/broker-b-1/store/commitlog/ storePathConsumerQueue=/data/rocketmq/2m-2s-async/broker-b-1/store/consumequeue/
依次启动: nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties & nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties & nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties & nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties &
查看集群信息: $ sh bin/mqadmin clusterList -n localhost:9876 #Cluster Name #Broker Name #BID #Addr #Version single-master broker-a 0 172.17.0.1:10911 V4_6_0 2m-2s-async 2m-2s-async-broker-a 0 172.17.0.1:13911 V4_6_0 2m-2s-async 2m-2s-async-broker-a 1 172.17.0.1:14911 V4_6_0 2m-2s-async 2m-2s-async-broker-b 0 172.17.0.1:15911 V4_6_0 2m-2s-async 2m-2s-async-broker-b 1 172.17.0.1:16911 V4_6_0 2m-noslave 2m-broker-a 0 172.17.0.1:11911 V4_6_0 2m-noslave 2m-broker-b 0 172.17.0.1:12911 V4_6_0
这里多出了2m-2s-async集群的四个broker节点信息。

3.2.4 多 Master 多 Slave 模式,同步复制
该模式需要使用conf/2m-2s-sync目录下的四个配置文件,与异步复制最大的不同是,需要将master节点的brokerRole配置项需要改为SYNC_MASTER。这里不再赘述。如果是在同一台机器上搭建此模式,记得修对应的参数。

3.3 停止
bin目录安装包下有一个 mqshutdown 脚本,其既可以关闭Broker,也可以关闭NameServer。注意该脚本会将本机上启动的所有Broker或所有NameServer关闭。
停止broker $ sh bin/mqshutdown broker The mqbroker(67521 74023 74153 362837 362958 363070) is running... Send shutdown request to mqbroker(67521 74023 74153 362837 362958 363070)
停止nameserver $ sh bin/mqshutdown namesrv The mqnamesrv(3953057) is running... Send shutdown request to mqnamesrv(3953057) OK

4 常见安装错误
错误1:端口已被占用 java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:433) at sun.nio.ch.Net.bind(Net.java:425) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67) at
原因:重复监听了同一个端口,通常是对同一个配置文件启动了多次,或者配置listenPort端口未生效。
错误2:MQ已启动 java.lang.RuntimeException: Lock failed,MQ already started at org.apache.rocketmq.store.DefaultMessageStore.start(DefaultMessageStore.java:222) at org.apache.rocketmq.broker.BrokerController.start(BrokerController.java:853) at org.apache.rocketmq.broker.BrokerStartup.start(BrokerStartup.java:64) at org.apache.rocketmq.broker.BrokerStartup.main(BrokerStartup.java:58)
原因:多个配置文件中,可能指定了相同的存储路径,检查配置是否正确。
错误3:配置文件不存在 java.io.FileNotFoundException: conf/2m-2s-async/broker-a-m.properties (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at java.io.FileInputStream.(FileInputStream.java:93) at org.apache.rocketmq.broker.BrokerStartup.createBrokerController(BrokerStartup.java:128) at org.apache.rocketmq.broker.BrokerStartup.main(BrokerStartup.java:58)
配置文件不存在,检查对应目录下是否有此文件
错误4:内存分配失败
未按照前文所属修改bin/runserver.sh,bin/runbroker.sh脚本,导致启动每一个节点时占用内存过多。如果本身机器内存就不足,可以不必同时运行这么多模式。
免费学习视频欢迎关注云图智联: https://e.yuntuzhilian.com/
软件开发
2020-07-21 13:41:00
题目:

解题思路:
回溯法解数独
类似人的思考方式去尝试,行,列,还有 3*3 的方格内数字是 1~9 不能重复。
我们尝试填充,如果发现重复了,那么擦除重新进行新一轮的尝试,直到把整个数组填充完成。
https://leetcode-cn.com/problems/sudoku-solver/solution/hui-su-fa-jie-shu-du-by-i_use_python/
代码: class Solution { public void solveSudoku(char[][] board) { // 三个布尔数组 表明 行, 列, 还有 3*3 的方格的数字是否被使用过 boolean[][] rowUsed = new boolean[9][10]; boolean[][] colUsed = new boolean[9][10]; boolean[][][] boxUsed = new boolean[3][3][10]; // 初始化 for(int row = 0; row < board.length; row++){ for(int col = 0; col < board[0].length; col++) { int num = board[row][col] - '0'; if(1 <= num && num <= 9){ rowUsed[row][num] = true; colUsed[col][num] = true; boxUsed[row/3][col/3][num] = true; } } } // 递归尝试填充数组 recusiveSolveSudoku(board, rowUsed, colUsed, boxUsed, 0, 0); } private boolean recusiveSolveSudoku(char[][]board, boolean[][]rowUsed, boolean[][]colUsed, boolean[][][]boxUsed, int row, int col){ // 边界校验, 如果已经填充完成, 返回true, 表示一切结束 if(col == board[0].length){ col = 0; row++; if(row == board.length){ return true; } } // 是空则尝试填充, 否则跳过继续尝试填充下一个位置 if(board[row][col] == '.') { // 尝试填充1~9 for(int num = 1; num <= 9; num++){ boolean canUsed = !(rowUsed[row][num] || colUsed[col][num] || boxUsed[row/3][col/3][num]); if(canUsed){ rowUsed[row][num] = true; colUsed[col][num] = true; boxUsed[row/3][col/3][num] = true; board[row][col] = (char)('0' + num); if(recusiveSolveSudoku(board, rowUsed, colUsed, boxUsed, row, col + 1)){ return true; } board[row][col] = '.'; rowUsed[row][num] = false; colUsed[col][num] = false; boxUsed[row/3][col/3][num] = false; } } } else { return recusiveSolveSudoku(board, rowUsed, colUsed, boxUsed, row, col + 1); } return false; } }
软件开发
2020-07-21 23:40:00
最近看到很多人都在找工作, 而且很多人都感觉今年找工作比去年难很多, 竞争力也增加不少, 因此激发我整理这份资料, 希望能帮到正在找或者准备找工作的童鞋们.
首先我们能否获得一个面试机会, 那肯定是从简历开始, 简历需要做好功夫, 一份好的简历才足够吸引企业得到面试机会, 接着就是面试了, 面试前必须要先做好准备, 多看一下前辈们总结面试题, 有哪一方面不足的地方赶紧补充一下, 还有要了解一下你即将面试那家公司.
一、简历
网上有很多对程序员简历的一些指导,这里就不重述,大家可以搜下网上其他大神的总结,结合自身情况修改下。我有几点建议:
1.尽量不要花哨,程序员和设计师或者产品运营还不一样,我们的简历成功与否决定权还是在技术面试官那,而他们 看重的是你的项目经验内容和技术等描述。
2. 技能描述这块尽量只写你懂得而且理解深刻的, 可以适当加入一些新技术或流行框架,不过这块需要理解,没来得及看源码的可以看看大神们对它的总结,网上一大堆。
3. 项目经验这块尽量加入关键词, 比如使用了什么技术、用到哪些设计模式、优化数据对比、扩展总结之类的。而非一味地介绍这个项目内容(那是产品经理的描述),比如性能优化这块,分为UI性能优化、内存优化、数据库优化、网络优化、耗电优化等等。可以从 1).如何发现问题 2).怎么解决问题 3).解决效果对比,这几个方面去描述。举个简单例子——UI优化,可以从 UI出现什么问题(卡顿不流畅),怎么查找问题(手机开发者权限>GPU过度绘制 发现层级问题,TraceView CPU使用情况分析),怎么解决问题(降低层级、自定义View绘图出现问题等),解决问题后性能再次对比。
二、技能储备
(一)Java
一、HashMap和Hashtable区别?
这个一定要去看源码!看源码!看源码!实在看不下去的可以上网看别人的分析。简单总结有几点:
1.HashMap支持null Key和null Value;Hashtable不允许。这是因为HashMap对null进行了特殊处理,将null的hashCode值定为了0,从而将其存放在哈希表的第0个bucket。
2.HashMap是非线程安全,HashMap实现线程安全方法为Map map = Collections.synchronziedMap(new HashMap());Hashtable是线程安全
3.HashMap默认长度是16,扩容是原先的2倍;Hashtable默认长度是11,扩容是原先的2n+1
4.HashMap继承AbstractMap;Hashtable继承了Dictionary
hashmap1.8之前和之后的版本之间的区别,数据结构?
扩展,HashMap 对比 ConcurrentHashMap ,HashMap 对比 SparseArray,LinkedArray对比ArrayList,ArrayList对比Vector。
二、Java垃圾回收机制
需要理解JVM,内存划分——方法区、内存堆、虚拟机栈(线程私有)、本地方法栈(线程私有)、程序计数器(线程私有), 理解回收算法——标记清除算法、可达性分析算法、标记-整理算法、复制算法、分代算法,优缺点都理解下。
详细的可以看看其他同学写的 点击打开链接
三、类加载机制
这个可以结合 热修复 深入理解下。点击打开链接
四、线程和线程池,并发,锁等一系列问题
这个可以扩展下 如何自己实现一个线程池?
五、HandlerThread、IntentService理解
六、弱引用、软引用区别
七、int、Integer有什么区别
主要考值传递和引用传递问题
八、手写生产者/消费者 模式
(二)Android
一、android启动模式
需要了解下Activity栈和taskAffinity
1.Standard:系统默认,启动一个就多一个Activity实例
2.SingleTop:栈顶复用,如果处于栈顶,则生命周期不走onCreate()和onStart(),会调用onNewIntent(),适合推送消息详情页,比如新闻推送详情Activity;
3.SingleTask:栈内复用,如果存在栈内,则在其上所有Activity全部出栈,使得其位于栈顶,生命周期和SingleTop一样,app首页基本是用这个
4.SingleInstance:这个是SingleTask加强本,系统会为要启动的Activity单独开一个栈,这个栈里只有它,适用新开Activity和app能独立开的,如系统闹钟,微信的视频聊天界面不知道是不是,知道的同学告诉我下,在此谢过!
另外,SingleTask和SingleInstance好像会影响到onActivityResult的回调,具体问题大家搜下,我就不详说。
1.Intent也需要进一步了解,Action、Data、Category各自的用法和作用,还有常用的
2.Intent.FLAG_ACTIVITY_SINGLE_TOP
3.Intent.FLAG_ACTIVITY_NEW_TASK
4.Intent.FLAG_ACTIVITY_CLEAR_TOP
等等,具体看下源码吧。
二、View的绘制流程 ViewRoot -> performTraversal ( ) -> performMeasure ( ) -> performLayout ( ) -> perfromDraw ( ) -> View / ViewGroup measure ( ) -> View / ViewGroup onMeasure ( ) -> View / ViewGroup layout ( ) -> View / ViewGroup onLayout ( ) -> View / ViewGroup draw ( ) -> View / ViewGroup onDraw ( )
看下invalidate方法,有带4个参数的,和不带参数有什么区别;requestLayout触发measure和layout,如何实现局部重新测量,避免全局重新测量问题。
三、事件分发机制 -> dispatchTouchEvent ( ) -> onInterceptTouchEvent ( ) -> onTouchEvent ( ) requestDisallowInterceptTouchEvent ( boolean ) 还有 onTouchEvent ( ) 、onTouchListener 、onClickListener的先后顺序
四、消息分发机制
这个考得非常常见。一定要看源码,代码不多。带着几个问题去看:
1.为什么一个线程只有一个Looper、只有一个MessageQueue?
2.如何获取当前线程的Looper?是怎么实现的?(理解ThreadLocal)
3.是不是任何线程都可以实例化Handler?有没有什么约束条件?
4.Looper.loop是一个死循环,拿不到需要处理的Message就会阻塞,那在UI线程中为什么不会导致ANR?
5.Handler.sendMessageDelayed()怎么实现延迟的?结合Looper.loop()循环中,Message=messageQueue.next()和MessageQueue.enqueueMessage()分析。
五、AsyncTask源码分析
优劣性分析,这个网上一大堆,不重述。
六、如何保证Service不被杀死?如何保证进程不被杀死?
这两个问题我面试过程有3家公司问到。
七、Binder机制,进程通信
Android用到的进程通信底层基本都是Binder,AIDL、Messager、广播、ContentProvider。不是很深入理解的,至少ADIL怎么用,Messager怎么用,可以写写看,另外序列化(Parcelable和Serilizable)需要做对比,这方面可以看看任玉刚大神的android艺术开发探索一书。
八、动态权限适配问题、换肤实现原理
这方面看下鸿洋大神的博文吧
九、SharedPreference原理,能否跨进程?如何实现?
(三)性能优化问题
一、UI优化
a.合理选择RelativeLayout、LinearLayout、FrameLayout,RelativeLayout会让子View调用2次onMeasure,而且布局相对复杂时,onMeasure相对比较复杂,效率比较低,LinearLayout在weight>0时也会让子View调用2次onMeasure。LinearLayout weight测量分配原则。
b.使用标签
c.减少布局层级,可以通过手机开发者选项>GPU过渡绘制查看,一般层级控制在4层以内,超过5层时需要考虑是否重新排版布局。
d.自定义View时,重写onDraw()方法,不要在该方法中新建对象,否则容易触发GC,导致性能下降
e.使用ListView时需要复用contentView,并使用Holder减少findViewById加载View。
f.去除不必要背景,getWindow().setBackgroundDrawable(null)
g.使用TextView的leftDrawabel/rightDrawable代替ImageView+TextView布局
二、内存优化
主要为了避免OOM和频繁触发到GC导致性能下降
a.Bitmap.recycle(),Cursor.close,inputStream.close()
b.大量加载Bitmap时,根据View大小加载Bitmap,合理选择inSampleSize,RGB_565编码方式;使用LruCache缓存
c.使用 静态内部类+WeakReference 代替内部类,如Handler、线程、AsyncTask
d.使用线程池管理线程,避免线程的新建
e.使用单例持有Context,需要记得释放,或者使用全局上下文
f.静态集合对象注意释放
g.属性动画造成内存泄露
h.使用webView,在Activity.onDestory需要移除和销毁,webView.removeAllViews()和webView.destory()
备:使用LeakCanary检测内存泄露
三、响应速度优化
Activity如果5秒之内无法响应屏幕触碰事件和键盘输入事件,就会出现ANR,而BroadcastReceiver如果10秒之内还未执行操作也会出现ANR,Serve20秒会出现ANR 为了避免ANR,可以开启子线程执行耗时操作,但是子线程不能更新UI,因此需要Handler消息机制、AsyncTask、IntentService进行线程通信。
备:出现ANR时,adb pull data/anr/tarces.txt 结合log分析
四、其他性能优化
a.常量使用static final修饰
b.使用SparseArray代替HashMap
c.使用线程池管理线程
d.ArrayList遍历使用常规for循环,LinkedList使用foreach
e.不要过度使用枚举,枚举占用内存空间比整型大
f.字符串的拼接优先考虑StringBuilder和StringBuffer
g.数据库存储是采用批量插入+事务
(四)设计模式
1.单例模式:好几种写法,要求会手写,分析优劣。一般双重校验锁中用到volatile,需要分析volatile的原理
2.观察者模式:要求会手写,有些面试官会问你在项目中用到了吗?实在没有到的可以讲一讲EventBus,它用到的就是观察者模式
3.适配器模式:要求会手写,有些公司会问和装饰器模式、代理模式有什么区别?
4.建造者模式+工厂模式:要求会手写
5.策略模式:这个问得比较少,不过有些做电商的会问。
6.MVC、MVP、MVVM:比较异同,选择一种你拿手的着重讲就行
(五)数据结构
1.HashMap、LinkedHashMap、ConcurrentHashMap,在用法和原理上有什么差异,很多公司会考HashMap原理,通过它做一些扩展,比如中国13亿人口年龄的排序问题,年龄对应桶的个数,年龄相同和hash相同问题类似。
2.ArrayList和LinkedList对比,这个相对简单一点。
3.平衡二叉树、二叉查找树、红黑树,这几个我也被考到。
4.Set原理,这个和HashMap考得有点类似,考hash算法相关,被问到过常用hash算法。HashSet内部用到了HashMap
(六)算法
算法主要考刷题吧,去LeetCode和牛客网刷下。
(七)源码理解
项目中多多少少会用到开源框架,很多公司都喜欢问原理和是否看过源码,比如网络框架Okhttp,这是最常用的,现在Retrofit+RxJava也很流行。
一、网络框架库 Okhttp
okhttp源码一定要去看下,里面几个关键的类要记住,还有连接池,拦截器都需要理解。被问到如何给某些特定域名的url增加header,如果是自己封装的代码,可以在封装Request中可以解决,也可以增加拦截器,通过拦截器去做。
推荐一篇讲解Okhttp不错的文章
二、消息通知 EventBus
1.EventBus原理:建议看下源码,不多。内部实现:观察者模式+注解+反射
2.EventBus可否跨进程问题?代替EventBus的方法(RxBus)
三、图片加载库(Fresco、Glide、Picasso)
1.项目中选择了哪个图片加载库?为什么选择它?其他库不好吗?这几个库的区别
2.项目中选择图片库它的原理,如Glide(LruCache结合弱引用),那么面试官会问LruCache原理,进而问LinkedHashMap原理,这样一层一层地问,所以建议看到不懂的追进去看。如Fresco是用来MVC设计模式,5.0以下是用了共享内存,那共享内存怎么用?Fresco怎么实现圆角?Fresco怎么配置缓存?
四、消息推送Push
1.项目中消息推送是自己做的还是用了第三方?如极光。还有没有用过其他的?这几家有什么优势区别,基于什么原因选择它的?
2.消息推送原理是什么?如何实现心跳连接?
五、TCP/IP、Http/Https
网络这一块如果简历中写道熟悉TCP/IP协议,Http/Https协议,那么肯定会被问道,我就验证了。一般我会回答网络层关系、TCP和UDP的区别,TCP三次握手(一定要讲清楚,SYN、ACK等标记位怎样的还有报文结构都需要熟悉下),四次挥手。为什么要三次握手?DDoS攻击。为什么握手三次,挥手要四次?Http报文结构,一次网络请求的过程是怎样的?Http和Https有什么不同?SSL/TLS是怎么进行加密握手的?证书怎么校验?对称性加密算法和非对称加密算法有哪些?挑一个熟悉的加密算法简单介绍下?DNS解析是怎样的?
六、热更新、热修复、插件化(这一块要求高点,一般高级工程师是需要理解的)
了解classLoader
七、新技术
RxJava、RxBus、RxAndroid,这个在面试想去的公司时,可以反编译下他们的包,看下是不是用到,如果用到了,面试过程难免会问道,如果没有,也可以忽略,但学习心强的同学可以看下,比较是比较火的框架。
Retrofit,熟练okhttp的同学建议看下,听说结合RxJava很爽。
Kotlin
三、最后
简历首选内推方式,速度快,效率高啊! 然后可以在拉钩,boss,脉脉,大街上看看。简历上写道熟悉什么技术就一定要去熟悉它,不然被问到不会很尴尬!做过什么项目,即使项目体量不大,但也一定要熟悉实现原理!不是你负责的部分,也可以看看同事是怎么实现的,换你来做你会怎么做?做过什么,会什么是广度问题,取决于项目内容。但做过什么,达到怎样一个境界,这是深度问题,和个人学习能力和解决问题的态度有关了。大公司看深度,小公司看广度。大公司面试你会的,小公司面试他们用到的你会不会,也就是岗位匹配度。
选定你想去的几家公司后,先去一些小的公司练练,学习下面试技巧,总结下,也算是熟悉下面试氛围,平时和同事或者产品PK时可以讲得头头是道,思路清晰至极,到了现场真的不一样,怎么描述你所做的一切,这绝对是个学术性问题!
面试过程一定要有礼貌!即使你觉得面试官不尊重你,经常打断你的讲解,或者你觉得他不如你,问的问题缺乏专业水平,你也一定要尊重他,谁叫现在是他选择你,等你拿到offer后就是你选择他了。
另外,描述问题一定要慢!不要一下子讲一大堆,慢显得你沉稳、自信,而且你还有时间反应思路接下来怎么讲更好。现在开发过多依赖ide,所以会有个弊端,当我们在面试讲解很容易不知道某个方法怎么读,这是一个硬伤……所以一定要对常见的关键性的类名、方法名、关键字读准,有些面试官不耐烦会说“你到底说的是哪个?”这时我们会容易乱了阵脚。正确的发音+沉稳的描述+好听的嗓音决对是一个加分项!
最重要的是心态!心态!心态!重要事情说三遍!面试时间很短,在短时间内对方要摸清你的底子还是比较不现实的,所以,有时也是看眼缘,这还是个看脸的时代。
希望大家都能找到合适自己满意的工作!fighting!
作者:蓝精灵8091
链接:https://www.jianshu.com/p/6bace0b3761a
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
软件开发
2020-07-21 23:31:00
一、数据结构与算法基础 说一下几种常见的排序算法和分别的复杂度。 用Java写一个冒泡排序算法 描述一下链式存储结构。 如何遍历一棵二叉树? 倒排一个LinkedList。 用Java写一个递归遍历目录下面的所有文件。
二、Java基础 接口与抽象类的区别? Java中的异常有哪几类?分别怎么使用? 常用的集合类有哪些?比如List如何排序? ArrayList和LinkedList内部的实现大致是怎样的?他们之间的区别和优缺点? 内存溢出是怎么回事?请举一个例子? ==和equals的区别? hashCode方法的作用? NIO是什么?适用于何种场景? HashMap实现原理,如何保证HashMap的线程安全? JVM内存结构,为什么需要GC? NIO模型,select/epoll的区别,多路复用的原理 Java中一个字符占多少个字节,扩展再问int, long, double占多少字节 创建一个类的实例都有哪些办法? final/finally/finalize的区别? Session/Cookie的区别? String/StringBuffer/StringBuilder的区别,扩展再问他们的实现? Servlet的生命周期? 如何用Java分配一段连续的1G的内存空间?需要注意些什么? Java有自己的内存回收机制,但为什么还存在内存泄露的问题呢? 什么是java序列化,如何实现java序列化?(写一个实例)? String s = new String("abc");创建了几个 String Object?
三、JVM JVM堆的基本结构。 JVM的垃圾算法有哪几种?CMS垃圾回收的基本流程? JVM有哪些常用启动参数可以调整,描述几个? 如何查看JVM的内存使用情况? Java程序是否会内存溢出,内存泄露情况发生?举几个例子。 你常用的JVM配置和调优参数都有哪些?分别什么作用? JVM的内存结构? 常用的GC策略,什么时候会触发YGC,什么时候触发FGC?
四、多线程/并发 如何创建线程?如何保证线程安全? 如何实现一个线程安全的数据结构 如何避免死锁 Volatile关键字的作用? HashMap在多线程环境下使用需要注意什么?为什么? Java程序中启动一个线程是用run()还是start()? 什么是守护线程?有什么用? 什么是死锁?如何避免 线程和进程的差别是什么? Java里面的Threadlocal是怎样实现的? ConcurrentHashMap的实现原理是? sleep和wait区别 notify和notifyAll区别 volatile关键字的作 ThreadLocal的作用与实现 两个线程如何串行执行 上下文切换是什么含义 可以运行时kill掉一个线程吗? 什么是条件锁、读写锁、自旋锁、可重入锁? 线程池ThreadPoolExecutor的实现原理?
五、Linux使用与问题分析排查 使用两种命令创建一个文件? 硬链接和软链接的区别? Linux常用命令有哪些? 怎么看一个Java线程的资源耗用? Load过高的可能性有哪些? /etc/hosts文件什么做用? 如何快速的将一个文本中所有“abc”替换为“xyz”? 如何在log文件中搜索找出error的日志? 发现磁盘空间不够,如何快速找出占用空间最大的文件? Java服务端问题排查(OOM,CPU高,Load高,类冲突) Java常用问题排查工具及用法(top, iostat, vmstat, sar, tcpdump, jvisualvm, jmap, jconsole) Thread dump文件如何分析(Runnable,锁,代码栈,操作系统线程ID关联) 如何查看Java应用的线程信息?
六、框架使用 描述一下Hibernate的三个状态? Spring中Bean的生命周期。 SpringMVC或Struts处理请求的流程。 Spring AOP解决了什么问题?怎么实现的? Spring事务的传播属性是怎么回事?它会影响什么? Spring中BeanFactory和FactoryBean有什么区别? Spring框架中IOC的原理是什么? spring的依赖注入有哪几种方式 struts工作流程 用Spring如何实现一个切面? Spring 如何实现数据库事务? Hibernate对一二级缓存的使用,Lazy-Load的理解; mybatis如何实现批量提交?
七、数据库相关 MySQL InnoDB、Mysaim的特点? 乐观锁和悲观锁的区别? 数据库隔离级别是什么?有什么作用? MySQL主备同步的基本原理。 select * from table t where size > 10 group by size order by size的sql语句执行顺序? 如何优化数据库性能(索引、分库分表、批量操作、分页算法、升级硬盘SSD、业务优化、主从部署) SQL什么情况下不会使用索引(不包含,不等于,函数) 一般在什么字段上建索引(过滤数据最多的字段) 如何从一张表中查出name字段不包含“XYZ”的所有行? MySQL,B+索引实现,行锁实现,SQL优化 Redis,RDB和AOF,如何做高可用、集群 如何解决高并发减库存问题 mysql存储引擎中索引的实现机制; 数据库事务的几种粒度; 行锁,表锁;乐观锁,悲观锁
八、网络协议和网络编程 TCP建立连接的过程。 TCP断开连接的过程。 浏览器发生302跳转背后的逻辑? HTTP协议的交互流程。HTTP和HTTPS的差异,SSL的交互流程? Rest和Http什么关系? 大家都说Rest很轻量,你对Rest风格如何理解? TCP的滑动窗口协议有什么用?讲讲原理。 HTTP协议都有哪些方法? 交换机和路由器的区别? Socket交互的基本流程? http协议(报文结构,断点续传,多线程下载,什么是长连接) tcp协议(建连过程,慢启动,滑动窗口,七层模型) webservice协议(wsdl/soap格式,与rest协议的区别) NIO的好处,Netty线程模型,什么是零拷贝
九、Redis等缓存系统/中间件/NoSQL/一致性Hash等 列举一个常用的Redis客户端的并发模型。 HBase如何实现模糊查询? 列举一个常用的消息中间件,如果消息要保序如何实现? 如何实现一个Hashtable?你的设计如何考虑Hash冲突?如何优化? 分布式缓存,一致性hash LRU算法,slab分配,如何减少内存碎片 如何解决缓存单机热点问题 什么是布隆过滤器,其实现原理是? False positive指的是? memcache与redis的区别 zookeeper有什么功能,选举算法如何进行 map/reduce过程,如何用map/reduce实现两个数据源的联合统计
十、设计模式与重构 你能举例几个常见的设计模式 你在设计一个工厂的包的时候会遵循哪些原则? 你能列举一个使用了Visitor/Decorator模式的开源项目/库吗? 你在编码时最常用的设计模式有哪些?在什么场景下用? 如何实现一个单例? 代理模式(动态代理) 单例模式(懒汉模式,并发初始化如何解决,volatile与lock的使用) JDK源码里面都有些什么让你印象深刻的设计模式使用,举例看看? 篇幅有限,这里收集了各方面的,当前公司的,还有自己收集总结的,下面的图片截取的有pdf,有如果有需要的自取.
各大公司面试题集合:
简历模板:
链接:  https://pan.baidu.com/s/1DO6XGkbmak7KIt6Y7JQqyw
提取码:fgj6
不知道会不会失效,如果失效 点击(778490892) 或者扫描下面二维码,进群获取,链接补发不过来,谢谢。
最后
欢迎大家评论区一起交流,相互提升;整理资料不易,如果喜欢文章记得点个赞哈,感谢大家支持!!!
软件开发
2020-07-14 11:51:00
关于新的activiti新团队与原有的团队重要开发人员我们罗列一下,细节如下:
Tijs Rademakers,算是activiti5以及6比较核心的leader了。现在是flowable框架的leader。
Joram Barrez 算是activiti5以及6比较核心的leader了。目前从事flowable框架开发。
Salaboy Activiti Cloud BPM leader(Activiti Cloud BPM 也就是目前的activiti7框架)
Tijs Rademakers以及Salaboy目前是两个框架的leader。
特此强调一点:activiti5以及activiti6、flowable是Tijs Rademakers团队开发的。
Activiti7是 Salaboy团队开发的。activiti6以及activiti5代码目前有 Salaboy团队进行维护。因为Tijs Rademakers团队去开发flowable框架了,所以activiti6以及activiti5代码已经交接给了 Salaboy团队(可以理解为离职之前工作交接)。目前的activiti5以及activiti6代码还是原Tijs Rademakers原有团队开发的。Salaboy团队目前在开发activiti7框架。对于activiti6以及activiti5的代码官方已经宣称暂停维护了。activiti7就是噱头 内核使用的还是activiti6。并没有为引擎注入更多的新特性,只是在activiti之外的上层封装了一些应用。
注意:activiti6的很多框架bug在flowable框架中已经修复的差不多了。
activiti5以及ativiti6的核心开发团队是Tijs Rademakers团队。activiti6最终版本由Salaboy团队发布的。
activiti6核心代码是Tijs Rademakers团队开发的,为何是Salaboy团队发布的呢?很简单,因为这个时候Tijs Rademakers团队已经去开发flowable去了。flowable是基于activiti-6.0.0.Beta4 分支开发的。下面我们截图一些flowable的发展。
目前Flowable已经修复了activiti6很多的bug,可以实现零成本从activiti迁移到flowable。
flowable目前已经支持加签、动态增加实例中的节点、支持cmmn、dmn规范。这些都是activiti6目前版本没有的。
1、flowable已经支持所有的历史数据使用mongdb存储,activiti没有。
2、flowable支持事务子流程,activiti没有。
3、flowable支持多实例加签、减签,activiti没有。
4、flowable支持httpTask等新的类型节点,activiti没有。
5、flowable支持在流程中动态添加任务节点,activiti没有。
6、flowable支持历史任务数据通过消息中间件发送,activiti没有。
7、flowable支持java11,activiti没有。
8、flowable支持动态脚本,,activiti没有。
9、flowable支持条件表达式中自定义juel函数,activiti没有。
10、flowable支持cmmn规范,activiti没有。
11、flowable修复了dmn规范设计器,activit用的dmn设计器还是旧的框架,bug太多。
12、flowable屏蔽了pvm,activiti6也屏蔽了pvm(因为6版本官方提供了加签功能,发现pvm设计的过于臃肿,索性直接移除,这样加签实现起来更简洁、事实确实如此,如果需要获取节点、连线等信息可以使用bpmnmodel替代)。工作流框架项目源码:www.1b23.com
13、flowable与activiti提供了新的事务监听器。activiti5版本只有事件监听器、任务监听器、执行监听器。
14、flowable对activiti的代码大量的进行了重构。
15、activiti以及flowable支持的数据库有h2、hsql、mysql、oracle、postgres、mssql、db2。其他数据库不支持的。使用国产数据库的可能有点失望了,需要修改源码了。
16、flowable支持jms、rabbitmq、mongodb方式处理历史数据,activiti没有。
软件开发
2020-07-14 11:50:06
随着企业之间联系的不断紧密,企业之间的系统服务也不断接受新的挑战。例如一个金融公司与一家大数据公司展开了合作,金融公司需要调用大数据公司的数据来完成自身业务的开展。那么服务之间的调用是怎么实现呢?Http可以实现,但我觉得web service 是讨论这类问题时不能避开的一个答案。
Web Service我的理解是更接近一种规范了,实现了这套规范的系统,可以互相之间交互数据已完成业务上的调用。它的特点有跨语言、跨平台,能够实现不同语言之间的互联互通。归结一句话:Web Service就是一种跨编程语言和跨操作系统平台的远程调用技术。
Web Service遵从的协议叫做SOAP协议,全称叫做 Simple Object Access Protocol,简单对象访问协议。Web Service服务传输的数据格式是XML形式组织的,通过HTTP协议发送和接收结果。因此我们可以简便的理解为:SOAP协议=HTTP协议+XML数据格式。
还有一个重要概念要提,就是WSDL( Web Services Description Language ),发布好一个web service 服务后怎么检查服务是否正常,答案就是查看WSDL文件。查看方式通常是在浏览器中打开服务的发布地址,然后后面加上 .wsdl即可。
下面我们发布个简单的服务。
服务端代码如下: import javax.jws.WebMethod; import javax.jws.WebService; import javax.xml.ws.Endpoint; @WebService public class HelloService { private String sayHello(String name) { System.out.println("hello " + name); return "Hello " + name; } @WebMethod public String sayHi(String name) { return this.sayHello(name); } public static void main(String[] args) { Endpoint.publish("http://localhost:8081/sayhi", new HelloService()); } }
代码和明晰,HelloService 是一个Web Service服务类,它对外提供了一个调用方法:sayHi。main函数中,我们用Endpoint来发布这个服务,发布方法的参数是地址、服务实例。
话不多述,启动main函数。在浏览器中输入地址:http://localhost:8081/sayhi?wsdl,我们看浏览器返回结果:
说明我们的服务发布成功,可以供外部调用了。
下来我们看外部系统如何调用这个服务。
在创建一个客户端工程。然后打开cmd命令行工具,使用jdk自带的客户端代码生成工具wsimport,生成客户端代码。生成方式如下:
我当前目录在e盘,-s表示生成源码即java文件,-p用来指定包结构。执行命令即可生成代码了。
执行后我的E盘下生成情况如图:
我们把java文件拷贝到客户端工程中去。
编写调用类ServiceClient: public class ServiceClient { public static void main(String[] args) { HelloServiceService service = new HelloServiceService(); HelloService service2 = service.getHelloServicePort(); String result = service2.sayHi("Wrold"); System.out.println(result); } }
执行查看结果: 。
需要特别注意的是wsimport那个定义包结构的参数,一定要输入正确。
软件开发
2019-04-13 16:43:00
4月15日,海信在北京举办2019年春季新品大秀,接连推出了四款电视新品,分别是:超画质电视U8、更好的OLED电视A8,AI图搜无人超越的 U7和主打线上的VIDAA电视。
与此同时,海信还放出重磅消息,他们已经研发成功一款搭载伸缩式摄像头,同时满足6路视频通话的全新社交电视产品,并已准备上市。
据介绍,海信社交电视在技术上首创6路视频通话、实时AI语音控制和4K视频播放功能,模糊了手机和电视的边界。
有了这台电视,家人、家庭之间可以随时视频互动聊天,父母与孩子可以实时交流,并支持一键切换小窗聊天模式,可以一边观看视频内容一边和家人亲友畅聊。朋友之间更是可以一起看电影追热剧,一起为喜欢的球队呐喊助威,也可以一起打够级一起k歌,并实时在电视朋友圈中分享相关内容。
截至目前,海信电视全球累计激活用户超过4200万,日活跃家庭高达1450万,内容点播超过1.28亿人次、大屏游戏143万人,每天人均人机语音交互对话25次,与家庭智慧电器交互百万次。 原文来自: http://news.mydrivers.com/1/623/623072.htm
本文地址: https://www.linuxprobe.com/hisense-developed-social-tv.html 编辑:王婷,审核员:逄增宝
软件开发
2019-05-04 21:21:00
对于那些使用Canonical的 Linux 容器LXD项目的人来说,版本3.12现在可以在本月的Ubuntu 19.04版本之前使用。LXD 3.12是此Linux Containers项目的最新功能版本,在提供集群改进方面特别重要。 LXD 3.12在集群前端提供聚合DHCP租约支持,显示位置的事件,支持更多 命令 中的--target以及其他增强功能。
除了集群之外,LXD 3.12还支持Shiftfs,允许通过Shiftfs文件系统内核补丁使用非特权容器,这些补丁还没有成为主流,但将成为Ubuntu 19.04内核的一部分。LXD现在还通过它的API导出支持的内核特性,改进CPU报告,改进GPU报告,并修复了一些开放的bug。
LXD 3.12的Snap预计将在下周为Ubuntu用户进入稳定的频道。 更多有关LXD 3.12的详细信息,请访问LinuxContainers.org。 原文来自: https://www.linuxidc.com/Linux/2019-04/157975.htm
本文地址: https://www.linuxprobe.com/lxd-312.html 编辑:清蒸github,审核员:逄增宝
软件开发
2019-05-03 14:09:00
Kafka写入数据保证不丢失: 每个partition至少有一个follower在ISR列表中,跟上了Leader的数据同步 每次写入数据时,都要求至少写入partition leader成功,还至少一个ISR里的follower写入成功,才算写入成功 如果不满足以上两个条件,就一直认为写入失败,让生产系统不断尝试,直到满足以上两个条件,才确认写入成功 根据以上三条,配置相应参数,才能保证写入kafka数据不会丢失
所以如果leader宕机,切换到那个follower上去,follower上有刚写入的数据,此时数据就不再丢失了。
软件开发
2019-05-20 17:38:00
在描述算法时通常用o(1), o(n), o(logn), o(nlogn) 来说明时间复杂度
o(1):是最低的时空复杂度,也就是耗时/耗空间与输入数据大小无关,无论输入数据增大多少倍,耗时/耗空间都不变。 哈希算法就是典型的O(1)时间复杂度,无论数据规模多大,都可以在一次计算后找到目标(不考虑冲突的话)
O(n):代表数据量增大几倍,耗时也增大几倍。(n)代表输入的数据量,比如 常见的遍历算法
O(n^2):代表数据量增大n倍,时间复杂度就是n的平方倍,比如 冒泡排序,就是典型的O(n^2)的算法 ,对n个数排序,需要扫描n×n次。
O(logn):当数据增大n倍时,耗时增大logn倍(这里的log是以2为底的,比如,当数据增大256倍时,耗时只增大8倍,是比线性还要低的时间复杂度)。 二分查找就是O(logn)的算法 ,每找一次排除一半的可能,256个数据中查找只要找8次就可以找到目标
O(nlogn):同理,就是n乘以logn,当数据增大256倍时,耗时增大256*8=2048倍。这个复杂度高于线性低于平方。 归并排序就是O(nlogn)的时间复杂度 。
软件开发
2018-07-23 09:05:00
这个是apollo中的datasource的namespace.
sb项目中的配置如下: package com.miaoyouche.user.config; import com.ctrip.framework.apollo.model.ConfigChangeEvent; import com.ctrip.framework.apollo.spring.annotation.ApolloConfigChangeListener; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.sql.DataSource; @Slf4j @Configuration @EnableConfigurationProperties(DataSourceProperties.class) public class DataSourceConfig { @Autowired ApplicationContext context; @Autowired private org.springframework.cloud.context.scope.refresh.RefreshScope refreshScope; @ApolloConfigChangeListener("datasource1") private void onChange(ConfigChangeEvent changeEvent) { DataSourceProperties dataSourceProperties = context.getBean(DataSourceProperties.class); changeEvent.changedKeys().stream().forEach(s -> { if (s.contains("spring.datasource.password")) { dataSourceProperties.setPassword(changeEvent.getChange(s).getNewValue()); } }); refreshScope.refresh("dataSource"); } @RefreshScope @Bean public DataSource dataSource(DataSourceProperties dataSourceProperties) { return dataSourceProperties.initializeDataSourceBuilder().build(); } }
@ApolloConfigChanageListener会监听到datasource1的namespace变化,变化后重新为DataSourceProperties的属性赋值.Apollo自身会在检测变化的时候,更新@RefreshScope的注解的bean
软件开发
2018-08-01 16:57:00
其中一位知情人士说,Alphabet的子公司正在开发自己的分布式数字分类帐本,第三方可以用它来发布和验证交易。虽然产品发布的时机还不清楚,但该公司计划提供这款产品,以便使自己的云服务区别于竞争对手。该人士补充说,谷歌同时还将提供一个白标版本,其他公司可将其安装在自己的服务器上运行。
这家互联网巨头还在收购并投资拥有数字分类帐专门知识的初创企业。许多交易尚未公布,该人士说。不过,根据研究公司CB Insights的数据,去年,Alphabet子公司已成为该领域的主要企业投资者,领先于花旗集团和高盛集团。
据另一位知情人士称,在最近几个月,谷歌基础设施团队的几个人一直在研究区块链协议,他们的顶头上司是谷歌云业务主管戴安妮·格林(Diane Greene)。其他几位不愿透露姓名的谷歌内部人士近日表示,云计算业务理所应当在区块链相关服务占有一席之地。
谷歌的一位发言人说。“像许多新技术一样,我们的几个各种团队已经在探索区块链的潜在用途,但猜测任何可能的用途或计划,目前仍为时尚早。”
2016年谷歌开始在云上位一些开发商测试区块链服务。知情人士说,该公司目前正在探索如何用更为广泛的方法采用这项技术。
数字账本
像区块链一样,数字分账式账本也在支撑着比特币和其他加密货币的发展。它们是通过因特网在数千台计算机上定期更新的数据库。每个条目都由这些机器加以确认,而这些机器可以是公共网络的一部分,也可以由企业私下运营。数字账本的种类很多,区块链只是其中一个。该技术的数据分析范围广泛,涵盖内容从交易到供应链更新,再到数字养猫。
这项技术给谷歌带来了挑战和机遇。分布式计算机网络运行的数字账本可以消除单一公司集中持有信息的风险,虽然谷歌的安全措施严密,但它毕竟是世界上最大的信息持有者之一。分散的方法也开始支持新在线服务,它们正在与谷歌展开竞争。
尽管如此,作为互联网先驱,谷歌拥有认可新的、开放的Web标准的长期经验,已经开始了解这项技术,建立自己的数字账本。不过,知情人士称,谷歌可能另一种类型,以便更容易运行数以百万计的交易。
在最近的一次会议上,谷歌的广告部主管说,他的部门有一个“小团队”正在关注区块链,但他同时指出,现有的核心技术不能快速处理大量交易。一些营销公司已经开始探索如何摆脱业内两大巨头的束缚,在不使用谷歌和Facebook技术的前提下,利用区块链的潜力来促进数字广告的发展。
当Alphabet想跟上新兴技术的发展潮流时,它所采用的方法常常是支持业内的初创公司,并进行小规模的收购来招募人才。根据CB Insights的统计,Alphabet公司下属的风险投资部门GV已经注资于钱包服务商Blockchain Luxembourg公司、金融交易网络Ripple公司、加密货币资产管理平台LedgerX、国际支付提供商Veem,以及现已解散的Buttercoin公司。
科技巨头建立自己的数字账本
到目前为止,其他科技巨头,如IBM和微软公司,正在引领潮流,可提供区块链相关工具,让一些企业利用他们的云服务来建立自己的数字账本。WinterGreen研究公司认为,区块链产品和服务的市场规模有望从去年的7.6亿美元增长到2024年的超过600亿美元。云巨头亚马逊公司正在帮助企业建立自己的区块链应用,Facebook总裁马克·扎克伯格正在密切关注加密货币、加密和其他分散的计算方法。
一批初创公司正试图利用数字账本挑战谷歌的在线优势。一份初创企业白皮书显示,Brave是一款网页浏览器,意欲调整谷歌的Chrome。Brave没有采用有针对性的广告,而是利用区块链技术,让人们为浏览网站付费。BitClave允许人们进行网上搜索,并通过观看广告获得回报。Presearch也在试图利用区块链与谷歌搜索引擎展开竞争。
“你会看到大量研发力量投入其中,令人难以置信,”风投公司纪源资本的执行合伙人杰夫·理查兹(Jeff Richards)说。“每个人都通过互联网和手机上学习,时不我待。 原文来自: http://www.alibuybuy.com/posts/90606.html
本文地址: https://www.linuxprobe.com/google-block-chain.html 编辑:尹慧慧,审核员:逄增宝
软件开发
2018-08-24 08:50:00
前言:
现有的uploadify上传是基于swf的,随着H5的普及,flash即将退出历史舞台,JEECG本着与时俱进的原则,将全面升级JEECG系统中的上传功能,采用新式上传插件plupload,此插件上传支持多种模式html5,flash,silverlight,html4,可通过配置实现优先模式,对于新版主流浏览器均可采用H5,对于不支持H5的低版本IE浏览器可走flash模式。以下讲解JEECG Online的升级。
一、升级步骤 (细节描述可忽略) :
1.下载升级文件
链接: https://pan.baidu.com/s/1SJXzPeN7qE4O6KB0O5qi2w 密码:z5eh
2.【新增文件】
增加JS/css,将plupload文件夹直接拷贝到src/main/webapp/plug-in/下
3.【修改文件】
拷贝Map.js至src/main/webapp/plug-in/tools/下,覆盖更新。
4.【修改文件】
拷贝FormHtmlUtil.java至src/main/java/org/jeecgframework/web/cgform/common/下,覆盖更新。
(细节描述:在FormHtmlUtil类中新增一个方法getFilePluploadFormHtml 详细见升级文件,并找到getFormHTML方法,修改文件类型加载的页面代码的方法为getFilePluploadFormHtml,如下) ······ }else if(cgFormFieldEntity.getShowType().equals("file")){ html=getFilePluploadFormHtml(cgFormFieldEntity);//获取实例化plupload组件的页面代码 } ······
5.【修改文件】
拷贝CgFormBuildController.java至src/main/java/org/jeecgframework/web/cgform/controller/build/下,覆盖更新。
(细节描述:在CgFormBuildController类中找到getHtmlHead方法,注掉老版JS,替换成新版,修改如下) //sb.append(""); sb.append("");
6.拷贝CgformFtlController.java至src/main/java/org/jeecgframework/web/cgform/controller/cgformftl/下,覆盖更新。
(细节描述:在CgformFtlController类中找到addorupdate方法,注掉老版JS,替换成新版,修改如下) //sb.append(""); sb.append("");
7.拷贝文件夹moblieCommon001、moblieCommon002、ui至src/main/resources/online/template/下,覆盖更新。
细节描述:这个步骤修改的有
a.新增宏文件src/main/resources/online/template/ui/uploadPltag.ftl,详细见升级文件。
b.修改tag宏文件src/main/resources/online/template/ui/tag.ftl: <#include "/online/template/ui/treetag.ftl"/> <#include "/online/template/ui/uploadPltag.ftl"/>
c.模板文件中的JS替换,即在src/main/resources/online/template下搜索 “jquery.uploadify-3.1.js” 若页面有,需要将此JS路径替换成plug-in/plupload/plupload.full.min.js,并且额外增加一个JS引入:

例如:搜索到文件src/main/resources/online/template/ui/basetag.ftl内有“jquery.uploadify-3.1.js”,代码如下: <#if hasFile==true>
现需要将其改成: <#if hasFile==true>
8.执行升级SQL update cgform_ftl SET FTL_CONTENT = replace(FTL_CONTENT, 'plug-in/uploadify/jquery.uploadify-3.1.js', 'plug-in/plupload/plupload.full.min.js');
软件开发
2018-09-07 15:54:00
package com.sky.study.delayQueue;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sky.study.ConnectionUtil;
/**
* 延时队列
*
* @author 86940
*
*/
public class DelayQueue {
private final static String queue_name = "message_ttl_queue";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Map arguments = new HashMap();
arguments.put("x-expires", 30000);// 队列过期时间
arguments.put("x-message-ttl", 12000);// 队列上消息过期时间
arguments.put("x-dead-letter-exchange", "exchange-direct");
arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");
channel.queueDeclare("delay_queue", true, false, false, arguments);
// 声明队列
channel.queueDeclare(queue_name, true, false, false, null);
channel.exchangeDeclare("exchange-direct", "direct");
// 绑定路由
channel.queueBind(queue_name, "exchange-direct", "message_ttl_routingKey");
String message = "hello world!" + System.currentTimeMillis();
// 设置延时属性
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
// 持久性 non-persistent (1) or persistent (2)
AMQP.BasicProperties properties = builder.deliveryMode(2).build();
// AMQP.BasicProperties properties =
// builder.expiration("30000").deliveryMode(2).build();// routingKey
// =delay_queue 进行转发
channel.basicPublish("", "delay_queue", properties, message.getBytes());
System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());
// 关闭频道和连接
channel.close();
connection.close();
}
}
消费者代码
package com.sky.study.delayQueue;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.sky.study.ConnectionUtil;
public class Consumer {
private static String queue_name = "message_ttl_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Map arguments = new HashMap();
arguments.put("x-expires", 30000);//队列过期时间
arguments.put("x-message-ttl", 12000);//队列上消息过期时间
arguments.put("x-dead-letter-exchange", "exchange-direct");//过期消息转向路由
arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");//过期消息转向路由相匹配routingkey
channel.queueDeclare("delay_queue", true, false, false, arguments);

// 声明队列
channel.queueDeclare(queue_name, true, false, false, null);
channel.exchangeDeclare("exchange-direct", "direct");
// 绑定路由
channel.queueBind(queue_name, "exchange-direct", "message_ttl_routingKey");

QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定消费队列
channel.basicConsume(queue_name, true, consumer);
while (true) {
// nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());
}
}
}
软件开发
2018-10-15 15:24:00
apache顶级项目(九) - T~Z
https://www.apache.org/
Tajo
Apache Tajo是Apache Hadoop的强大的大数据关系和分布式数据仓库系统。Tajo专为存储在HDFS(Hadoop分布式文件系统)和其他数据源上的大型数据集上的低延迟和可扩展的即席查询,在线聚合和ETL(提取 - 转换 - 加载过程)而设计。通过支持SQL标准并利用高级数据库技术,Tajo允许跨各种查询评估策略和优化机会直接控制分布式执行和数据流。
Tapestry
面向组件的框架,用于在Java中创建高度可伸缩的Web应用程序。
Tcl
Apache Tcl是Tcl-Apache集成工作的基地。我们项目的目的是将Apache Web服务器的强大功能与成熟,强大和灵活的Tcl脚本语言的功能相结合。
Tez
ApacheTEZ®项目旨在构建一个应用程序框架,该框架允许处理数据的任务的复杂的有向非循环图。它目前在Apache Hadoop YARN上面构建。
Thrift
Apache Thrift软件框架,用于可扩展的跨语言服务开发,将软件堆栈与代码生成引擎相结合,构建可在C ++,Java,Python,PHP,Ruby,Erlang,Perl,Haskell,C#之间高效无缝地工作的服务,Cocoa,JavaScript,Node.js,Smalltalk,OCaml和Delphi等语言。
Tika
Apache Tika™工具包可从超过一千种不同的文件类型(如PPT,XLS和PDF)中检测和提取元数据和文本。所有这些文件类型都可以通过单一界面进行解析,使Tika对搜索引擎索引,内容分析,翻译等非常有用。您可以在下载页面上找到最新版本。有关如何开始使用Tika的更多信息,请参阅“入门”页面。
Tiles
适用于现代Java应用程序的免费开源模板框架。基于Composite模式,它可以简化用户界面的开发。
TinkerPop
Apache TinkerPop™是图形数据库(OLTP)和图形分析系统(OLAP)的图形计算框架。
Tomcat
TomEE
嵌入式或远程EE应用服务器|
Traffic Control
Apache Traffic Control允许您使用开源构建大规模内容交付网络。Traffic Control是围绕Apache Traffic Server构建的缓存软件,它实现了现代CDN的所有核心功能。
Traffic Server
Apache Traffic Server™软件是一种快速,可扩展且可扩展的HTTP / 1.1和HTTP / 2.0兼容的缓存代理服务器。以前是商业产品,雅虎!捐赠给了Apache Foundation,目前被几家主要的CDN和内容所有者使用。
Trafodion
事务性SQL-on-Hadoop数据库
Turbine
Apache Turbine™是一个基于servlet的框架,允许有经验的Java开发人员快速构建Web应用程序。Turbine允许您使用个性化网站并使用用户登录来限制对应用程序部分的访问。
Twill
Apache Twill是ApacheHadoop®YARN的抽象,它降低了开发分布式应用程序的复杂性,允许开发人员专注于他们的应用程序逻辑。Apache Twill允许您使用与运行线程类似的编程模型来使用YARN的分布式功能。
UIMA
非结构化信息管理应用程序(UIMA)是分析大量非结构化信息的软件系统,以便发现与最终用户相关的知识。示例UIM应用程序可以摄取纯文本并识别实体,例如人员,地点,组织;或关系,例如工作或位于。
Usergrid
您运行的BaaS框架构建应用而非服务器!无论您需要支持一个App还是一百个,Usergrid都是您的后端。像LAMP堆栈一样简单,但是专为移动设备而构建。在创纪录的时间内将应用程序投入生产并停止浪费编写服务器端代码的周期。适用于iOS,Android,HTML5 / JS,Node.js,Ruby,Java,.NET和PHP的完整SDK。自2011年开源。
VCL
VCL代表虚拟计算实验室。它是一个免费的开源云计算平台,其主要目标是为用户提供专用的自定义计算环境。计算环境的范围可以从运行生产力软件的虚拟机到运行复杂HPC仿真的强大物理服务器集群。
Velocity
Velocity是一个基于Java的模板引擎。它允许任何人使用简单但功能强大的模板语言来引用Java代码中定义的对象。
VXQuery
Apache VXQuery™将是一个用Java实现的符合标准的XML查询处理器。重点是评估对大量XML数据的查询。具体而言,目标是评估大型相对较小的XML文档集合上的查询。为了实现此目的,将在无共享机器的集群上评估查询。
Web Services
Apache Web Services项目是许多与Web服务相关的项目的所在地。
活跃项目Apache Web Services项目主动维护以下子项目: Apache Axiom一种XML和SOAP对象模型,支持延迟解析和按需构建对象树。 Apache Neethi程序员使用WS Policy的一般框架。Apache Woden用于读取,操作,创建和编写WSDL文档的Java类库。 Apache WSS4J来自OASIS Web Services Security TC的OASIS Web服务安全性(WS-Security)的实现。 Apache XmlSchema用于创建和遍历W3C XML Schema 1.0文档的Java类库。
Whimsy
Apache Whimsy项目志愿者创建了许多单独的Apache Whimsy工具,用于显示和可视化与ASF组织和流程相关的各种数据。生产Whimsy服务器还有许多工具可以自动化某些组织流程,例如将PMC成员添加到官方公司名册或阅读,更新和批准每月的董事会议程。
Wicket
Apache Wicket项目宣布了为全球网站和应用程序服务十多年的开源Java Web框架的第8个主要版本。在此版本中,Wicket完全接受Java 8习语,允许在所有正确的位置使用lambda表达式。使用Wicket 8,您可以编写更少,更快,更易维护的代码。
Xalan
Apache Xalan Project开发和维护使用XSLT标准样式表转换XML文档的库和程序。我们的子项目使用Java和C ++编程语言来实现XSLT库。
Xerces
Apache Xerces™项目负责授权Apache Software Foundation的软件,用于创建和维护:XML解析器相关软件组件
XMLBeans
XMLBeans是一种通过将XML绑定到Java类型来访问XML的技术。XMLBeans提供了几种获取XML的方法,包括:通过已编译的XML模式生成表示模式类型的Java类型。通过这种方式,您可以在“getFoo”和“setFoo”之后通过JavaBeans样式的访问器访问模式的实例。XMLBeans API还允许您通过XML Schema Object模型反映到XML模式本身。一种游标模型,您可以通过该模型遍历完整的XML信息集。支持XML DOM。
XML Graphics
Apache XML Graphics Project负责为Apache Software Foundation授权的软件,用于创建和维护:XML格式转换为图形输出相关软件组件
Apache™XML Graphics Project目前包含以下子项目,每个子项目都侧重于XML Graphics的不同方面:Apache Batik - 基于Java的可缩放矢量图形(SVG)工具包Apache FOP - 基于Java的XSL-FO(FO =格式化对象)的打印格式化程序和渲染器Apache XML Graphics Commons - 一个包含Apache Batik和Apache FOP使用的各种组件的库,用Java编写
Yetus
Apache Yetus...是一组库和工具,可以为软件项目提供贡献和发布流程。
Zeppelin
Apache Zeppelin基于Web的笔记本电脑,支持数据驱动,使用SQL,Scala等交互式数据分析和协作文档。
ZooKeeper
Apache ZooKeeper致力于开发和维护开源服务器,实现高度可靠的分布式协调。
软件开发
2018-11-27 14:04:00
动态代理模式
在介绍这个模式之前我们,先看看背景需求:
查看工资的需求:进行安全性检查,开启日志记录,(权限判断)如果有权限则查看工资,否则提示无权限。
通常的实现方式
安全性框架检查类: public class Security { public void security(){ System.out.println("checking security...."); } }
日志记录 public class Logger { public void log() { System.out.println("starting log...."); } }
权限判断 public class Privilege { private String access; public String getAccess() { return access; } public void setAccess(String access) { this.access = access; } }
目标类 public class SalaryManager { private Logger logger; private Privilege privilege; private Security security; public SalaryManager(Security security, Logger log, Privilege privilege) { super(); this.security = security; this.logger = log; this.privilege = privilege; } public void showSalary() { this.security.security(); this.logger.log(); if ("admin".equals(privilege.getAccess())) { System.out.println("这个月工资涨了10%..."); } else { System.out.println("对不起,你无权限查看工资!"); } } }
测试类 public class ShowSalaryTest { [@Test](https://my.oschina.net/azibug) public void test() { Security security = new Security(); Logger log = new Logger(); Privilege privilege = new Privilege(); privilege.setAccess("admin"); SalaryManager sm = new SalaryManager(security,log,privilege); sm.showSalary(); } }
小结:
目标类和一些公共事务耦合在一起了,而且目标类也是被固定写死了,无法做到动态执行某个目标类。其实这类公共的事务:安全性验证,日志记录和权限验证是可以被其他业务(客户)使用的,应该独立出来,达到复用的效果。
手写动态代理
首先将工资管理,即目标类用接口去封装,如下: public interface SalaryManage { // 目标动作 public void showSalary(); }
SalaryManage的实现类 public class SalaryManageImpl implements SalaryManage { [@Override](https://my.oschina.net/u/1162528) public void showSalary() { System.out.println("涨工资了。。。。"); } }
再建立目标类的代理类 /** * 目标类的代理类 * */ public class SalaryManageProxy { private Logger log; private Privilege privilege; private Security security; private SalaryManage salaryManager; public SalaryManageProxy() { super(); } public SalaryManageProxy(Logger log, Privilege privilege, Security security, SalaryManage salaryManager) { super(); this.log = log; this.privilege = privilege; this.security = security; this.salaryManager = salaryManager; } /** * 优点:添加了代理类,将目标类和公共事务分离 * 缺点:代理类的代理方法中目标类和目标方法被固定死了,无法动态变化,不可重用。 */ //代理方法 public void showSalary() { this.log.log(); this.security.security(); if ("admin".equals(privilege.getAccess())) { salaryManager.showSalary();// 目标类的目标方法 } else { System.out.println("对不起,你没有权限访问!"); } } }
测试类: public class ShowSalaryTest { /** * 通过引入代理类,将目标类和公共事务分离 */ [@Test](https://my.oschina.net/azibug) public void test() { Security security = new Security(); Logger log = new Logger(); Privilege privilege = new Privilege(); privilege.setAccess("admin"); SalaryManage sm = new SalaryManageImpl(); /** * 代理类调用代理方法,执行目标类目标方法。达到了预期效果 */ new SalaryManageProxy(log, privilege, security, sm).showSalary();; } }
小结: 优点:添加了代理类,将目标类和公共事务分离 缺点:代理类的代理方法中目标类和目标方法被固定死了,无法动态变化,不可重用。
JDK动态代理
目标类的接口 public interface SalaryManage { // 目标动作 public void showSalary(); }
目标类的实现 public class SalaryManageImpl implements SalaryManage { [@Override](https://my.oschina.net/u/1162528) public void showSalary() { System.out.println("涨工资了。。。。"); } }
拦截器(即代理类) /** * 拦截器,基于实现jdk InvocationHandler 的拦截器 * */ public class SalaryManageJDKProxy implements InvocationHandler{ private Logger log; private Privilege privilege; private Security security; private Object target; public SalaryManageJDKProxy() { super(); } public SalaryManageJDKProxy(Logger log, Privilege privilege, Security security) { super(); this.log = log; this.privilege = privilege; this.security = security; } [@Override](https://my.oschina.net/u/1162528) public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { this.security.security(); this.log.log(); if ("admin".equals(this.privilege.getAccess())) { method.invoke(this.target, args); } else { System.out.println("对不起,你没有权限访问!"); } return null; } public Object getTarget() { return target; } public void setTarget(Object target) { this.target = target; } }
测试类 public class JDKProxySalaryTest { @Test public void test() { Security security = new Security(); Logger log = new Logger(); Privilege privilege = new Privilege(); privilege.setAccess("admin"); SalaryManage sm = new SalaryManageImpl(); //拦截器 SalaryManageJDKProxy salaryManageJDKProxy = new SalaryManageJDKProxy(log, privilege, security); salaryManageJDKProxy.setTarget(sm); /** * 生成代理对象 * ClassLoader loader, 目标类的类加载器 * Class[] interfaces,目标类的接口数组 * InvocationHandler h,代理类实例 */ SalaryManage newProxyInstance = (SalaryManage) Proxy.newProxyInstance(SalaryManage.class.getClassLoader(), new Class[] { SalaryManage.class }, salaryManageJDKProxy);//代理对象,被创建的代理对象实现过了目标类的接口 newProxyInstance.showSalary(); } }
小结: 概念:目标类,代理类,拦截器 目标接口,由目标类实现目标接口 目标类和代理类实现了共同的接口
cglib代理模式
需求说明:模拟hibernate编程 开启事务 进行增删改查(目标类的目标方法) 结束事务
事务类 public class Transaction { public void beginTransaction() { System.out.println("begin transaction"); } public void commit() { System.out.println("commit"); } }
Dao类 public class PersonDao { public void updatePerson() { System.out.println("update person"); } public void addPerson() { System.out.println("add person"); } public void deletePerson() { System.out.println("delete person"); } public void listPersons() { System.out.println("list person"); } }
拦截类 import java.lang.reflect.Method; import net.sf.cglib.proxy.Enhancer; import net.sf.cglib.proxy.MethodInterceptor; import net.sf.cglib.proxy.MethodProxy; public class PersonDaoInterceptor implements MethodInterceptor { private Transaction transaction; private Object target; public PersonDaoInterceptor(Transaction transaction, Object target) { super(); this.transaction = transaction; this.target = target; } /** * 产生代理对象 * @return */ public Object createProxy() { Enhancer enhancer = new Enhancer(); enhancer.setSuperclass(this.target.getClass());//设置目标类为代理类的父类 enhancer.setCallback(this);//设置拦截器为回调函数 return enhancer.create(); } @Override public Object intercept(Object arg0, Method method, Object[] args, MethodProxy arg3) throws Throwable { Object obj; String methodName = method.getName(); if ("updatePerson".equals(methodName) || "addPerson".equals(methodName) || "deletePerson".equals(methodName)) { //开启事务 this.transaction.beginTransaction(); //调用目标类的目标方法 obj = method.invoke(this.target, args); //做是否提交事务 this.transaction.commit(); } else { //调用目标类的目标方法 obj = method.invoke(this.target, args); } return obj; } public Transaction getTransaction() { return transaction; } public void setTransaction(Transaction transaction) { this.transaction = transaction; } public Object getTarget() { return target; } public void setTarget(Object target) { this.target = target; } }
测试类: import org.junit.Test; public class PersonDaoTest { @Test public void test() { Transaction transaction = new Transaction(); PersonDao personDao = new PersonDao(); PersonDaoInterceptor inteceptor = new PersonDaoInterceptor(transaction,personDao); //代理类是目标类的子类。 PersonDao proxy = (PersonDao)inteceptor.createProxy(); proxy.addPerson(); } }
总结 概念:目标类,代理类,拦截器 jdk: 目标类和代理类实现了共同的接口 拦截器必须实现jdk提供的InvocationHandler,而这个接口中的invoke方法体内容=代理对象方法体内容 当客户端用代理对象调用方法时,invoke方法执行 cglib: 目标类是代理类的父类 拦截器实现了MethodInterceptor,而接口中的intercept方法=代理对象方法体 使用字节码增强机制创建代理对象
软件开发
2018-12-24 15:57:00
简介
安全是我们开发中一直需要考虑的问题,例如做身份认证、权限限制等等。市面上比较常见的安全框架有: shiro spring security
shiro比较简单,容易上手。而spring security功能比较强大,但是也比较难以掌握。springboot集成了spring security,我们这次来学习spring security的使用。
spring security
应用程序的两个主要区域是“认证”和“授权”(或者访问控制)。这两个主要区域是Spring Security 的两个目标。 “认证”(Authentication),是建立一个他声明的主体的过程(一个“主体”一般是指用户,设备或一些可以在你的应用程序中执行动作的其他系统)。 “授权”(Authorization)指确定一个主体是否允许在你的应用程序执行一个动作的过程。为了抵达需要授权的店,主体的身份已经有认证过程建立。 这个概念是通用的而不只在Spring Security中。
Spring Security是针对Spring项目的安全框架,也是Spring Boot底层安全模块默认的技术选型。他可以实现强大的web安全控制。对于安全控制,我们仅需引入spring-boot-starter-security模块,进行少量的配置,即可实现强大的安全管理。需要注意几个类: WebSecurityConfigurerAdapter:自定义Security策略 AuthenticationManagerBuilder:自定义认证策略 @EnableWebSecurity:开启WebSecurity模式
测试使用
搭建基本测试环境 引入thymeleaf和security场景启动器 org.springframework.boot spring-boot-starter-security org.springframework.boot spring-boot-starter-thymeleaf 编写几个简单的html页面,我们将其分别放在不同的模板文件夹子目录user,admin以及super中,预备给我们的3种不同的角色访问适用。 ++template ----index.html ++++user ------user1.html ------user2.html ------user3.html ++++admin ------admin1.html ------admin2.html ------admin3.html ++++super ------super1.html ------super2.html ------super3.html
每个html都写一点简单的内容,类似于this is xxxx.html。例如admin/admin3.html的内容如下: admin-3 this is admin-3 file. 为了方便查看,你也可以将title标签体内容修改为一致的名称,如admin-3 编写controller,对我们的访问路径进行映射: package com.example.dweb.controller; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; @Controller public class IndexController { @GetMapping("/") public String user1(){ return "/index"; } @GetMapping("/user/user1") public String user1(){ return "/user/user1"; } @GetMapping("/user/user2") public String user2(){ return "/user/user2"; } @GetMapping("/user/user3") public String user3(){ return "/user/user3"; } @GetMapping("/admin/admin1") public String admin1(){ return "/admin/admin1"; } @GetMapping("/admin/admin2") public String admin2(){ return "/admin/admin2"; } @GetMapping("/admin/admin3") public String admin3(){ return "/admin/admin3"; } @GetMapping("/super/super1") public String super1(){ return "/super/super1"; } @GetMapping("/super/super2") public String super2(){ return "/super/super2"; } @GetMapping("/super/super3") public String super3(){ return "/super/super3"; } } 运行项目,测试我们对各个页面的访问是否正常。在运行项目之前,先在pom文件中将spring-security场景启动器删除,避免security对我们进行访问拦截: 单元测试 --> Code Review --> 功能测试 --> 性能测试 --> 上线 --> 运维、Bug修复 --> 测试上线 --> 退休下线。开发到上线的时间也许是几周或者几个月,但是线上运维、bug修复的周期可以是几年。
在这几年的时间里面,几乎不可能还是原来的作者在维护了。继任者如何能理解之前的代码逻辑是极其关键的,如果不能维护,只能自己重新做一套。所以在项目中我们经常能见到的情况就是,看到了前任的代码,都觉得这是什么垃圾,写的乱七八糟,还是我自己重写一遍吧。就算是在开发的过程中,需要别人来Code Review,如果他们都看不懂这个代码,怎么来做Review呢。还有你也不希望在休假的时候,因为其他人看不懂你的代码,只好打电话求助你。这个我印象极其深刻,记得我在工作不久的时候,一次回到了老家休假中,突然同事打电话来了,出现了一个问题,问我该如何解决,当时电话还要收漫游费的,非常贵,但是我还不得不支持直到耗光我的电话费。
所以代码主要还是写给人看的,是我们的交流的途径。那些非常好的开源的项目虽然有文档,但是更多的我们其实还是看他的源码,如果开源项目里面的代码写的很难读,这个项目也基本上不会火。因为代码是我们开发人员交流的基本途径,甚至可能口头讨论不清楚的事情,我们可以通过代码来说清楚。代码的可读性我觉得是第一位的。各个公司估计都有自己的代码规范,遵循相关的规范保持代码风格的统一是第一步(推荐谷歌代码规范[1]和微软代码规范[2])。规范里一般都包括了如何进行变量、类、函数的命名,函数要尽量短并且保持原子性,不要做多件事情,类的基本设计的原则等等。另外一个建议是可以多参考学习一下开源项目中的代码。
KISS (Keep it simple and stupid)
一般大脑工作记忆的容量就是5-9个,如果事情过多或者过于复杂,对于大部分人来说是无法直接理解和处理的。通常我们需要一些辅助手段来处理复杂的问题,比如做笔记、画图,有点类似于在内存不够用的情况下我们借用了外存。
学CS的同学都知道,外存的访问速度肯定不如内存访问速度。另外一般来说在逻辑复杂的情况下出错的可能要远大于在简单的情况下,在复杂的情况下,代码的分支可能有很多,我们是否能够对每种情况都考虑到位,这些都有困难。为了使得代码更加可靠,并且容易理解,最好的办法还是保持代码的简单,在处理一个问题的时候尽量使用简单的逻辑,不要有过多的变量。
但是现实的问题并不会总是那么简单,那么如何来处理复杂的问题呢?与其借用外存,我更加倾向于对复杂的问题进行分层抽象。网络的通信是一个非常复杂的事情,中间使用的设备可以有无数种(手机,各种IOT设备,台式机,laptop,路由器,交换机...), OSI协议对各层做了抽象,每一层需要处理的情况就都大大地简化了。通过对复杂问题的分解、抽象,那么我们在每个层次上要解决处理的问题就简化了。其实也类似于算法中的divide-and-conquer, 复杂的问题,要先拆解掉变成小的问题,从而来简化解决的方法。
KISS还有另外一层含义,“如无必要,勿增实体” (奥卡姆剃刀原理)。CS中有一句 "All problems in computer science can be solved by another level of indirection", 为了系统的扩展性,支持将来的一些可能存在的变化,我们经常会引入一层间接层,或者增加中间的interface。在做这些决定的时候,我们要多考虑一下是否真的有必要。增加额外的一层给我们的好处就是易于扩展,但是同时也增加了复杂度,使得系统变得更加不可理解。对于代码来说,很可能是我这里调用了一个API,不知道实际的触发在哪里,对于理解和调试都可能增加困难。
KISS本身就是一个trade off,要把复杂的问题通过抽象和分拆来简单化,但是是否需要为了保留变化做更多的indirection的抽象,这些都是需要仔细考虑的。
DRY (Don't repeat yourself)
为了快速地实现一个功能,知道之前有类似的,把代码copy过来修改一下就用,可能是最快的方法。但是copy代码经常是很多问题和bug的根源。有一类问题就是copy过来的代码包含了一些其他的逻辑,可能并不是这部分需要的,所以可能有冗余甚至一些额外的风险。
另外一类问题就是在维护的时候,我们其实不知道修复了一个地方之后,还有多少其他的地方还需要修复。在我过去的项目中就出现过这样的问题,有个问题明明之前做了修复,过几天另外一个客户又提了类似的问题出现的另外的路径上。相同的逻辑要尽量只出现在一个地方,这样有问题的时候也就可以一次性地修复。这也是一种抽象,对于相同的逻辑,抽象到一个类或者一个函数中去,这样也有利于代码的可读性。
是否要写注释
个人的观点是大部分的代码尽量不要注释。代码本身就是一种交流语言,并且一般来说编程语言比我们日常使用的口语更加的精确。在保持代码逻辑简单的情况下,使用良好的命名规范,代码本身就很清晰并且可能读起来就已经是一篇良好的文章。特别是OO的语言的话,本身object(名词)加operation(一般用动词)就已经可以说明是在做什么了。重复一下把这个操作的名词放入注释并不会增加代码的可读性。并且在后续的维护中,会出现修改了代码,却并不修改注释的情况出现。在我做的很多Code Review中我都看到过这样的情况。尽量把代码写的可以理解,而不是通过注释来理解。
当然我并不是反对所有的注释,在公开的API上是需要注释的,应该列出API的前置和后置条件,解释该如何使用这个API,这样也可以用于自动产品API的文档。在一些特殊优化逻辑和负责算法的地方加上这些逻辑和算法的解释还是非常有必要的。
一次做对,不要相信以后会Refactoring
通常来说在代码中写上TODO,等着以后再来refactoring或者改进,基本上就不会再有以后了。我们可以去我们的代码库里面搜索一下TODO,看看有多少,并且有多少是多少年前的,我相信这个结果会让你很惊讶(欢迎大家留言分享你查找之后的结果)。 尽量一次就做对,不要相信以后还会回来把代码refactoring好。人都是有惰性的,一旦完成了当前的事情,move on之后再回来处理这些概率就非常小了,除非下次真的需要修改这些代码。如果说不会再回来,那么这个TODO也没有什么意义。如果真的需要,就不要留下这个问题。我见过有的人留下了一个TODO,throw了一个not implemented的exception,然后几天之后其他同学把这个代码带上线了,直接挂掉的情况。尽量不要TODO, 一次做好。
是否要写单元测试?
个人的观点是必须,除非你只是做prototype或者快速迭代扔掉的代码。 Unit tests are typically automated tests written and run by software developers to ensure that a section of an application (known as the "unit") meets its design and behaves as intended. In procedural programming, a unit could be an entire module, but it is more commonly an individual function or procedure. In object-oriented programming, a unit is often an entire interface, such as a class, but could be an individual method. From Wikipedia
单元测试是为了保证我们写出的代码确实是我们想要表达的逻辑。当我们的代码被集成到大项目中的时候,之后的集成测试、功能测试甚至e2e的测试,都不可能覆盖到每一行的代码了。如果单元测试做的不够,其实就是在代码里面留下一些自己都不知道的黑洞,哪天调用方改了一些东西,走到了一个不常用的分支可能就挂掉了。我之前带的项目中就出现过类似的情况,代码已经上线几年了,有一次稍微改了一下调用方的参数,觉得是个小改动,但是上线就挂了,就是因为遇到了之前根本没有人测试过的分支。单元测试就是要保证我们自己写的代码是按照我们希望的逻辑实现的,需要尽量的做到比较高的覆盖,确保我们自己的代码里面没有留下什么黑洞。关于测试,我想单独开一篇讨论,所以就先简单聊到这里。
要写好代码确实是已经非常不容易的事情,需要考虑正确性、可读性、鲁棒性、可测试性、可以扩展性、可以移植性、性能。前面讨论的只是个人觉得比较重要的入门的一些点,想要写好代码需要经过刻意地考虑和练习才能真正达到目标!
最后
欢迎各位技术同路人加入阿里云云监控(CloudMonitor)团队,我们专注于解决云上服务和资源的可观测性问题,并和云上的运维工具进行整合,致力于为企业、开发者提供一站式的智能监控运维服务,内推直达邮箱:guodong.chen@alibaba-inc.com 相关链接
[1] https://google.github.io/styleguide/ [2] https://docs.microsoft.com/en-us/dotnet/standard/design-guidelines/
软件开发
2020-08-04 15:31:00
原文链接:http://www.cnblogs.com/xhb-bky-blog/p/9051380.html
导图
下图是我结合自己的经验以及搜集整理的数据库优化相关内容的思维导图。




常用关键字优化
在编写T-SQL的时候,会使用很多功能类似的关键字,比如COUNT和EXISTS、IN和BETWEEN AND等,我们往往会根据需求直奔主题地来编写查询脚本,完成需求要求实现的业务逻辑即可,但是,我们编写的脚本中却存在着很多的可优化的空间。
EXISTS代替COUNT或IN
不要在子查询中使用COUNT()执行存在性检查,不要使用类似于如下这样的语句:
SELECT COLUMN_LIST FROM TABLENAME WHERE 0 < (SELECT COUNT(*) FROM TABLE2 WHERE ..)
而应该采用这样的语句代替:
SELECT COLUMN_LIST FROM TABLENAME WHERE EXISTS(SELECT COLUMN_LIST FROM TABLE2 WHERE ...)
当你使用COUNT()时,SQL SERVER不知道你要做的是存在性检查,它会计算所有匹配的值,要么会执行全表扫描,要么会扫描最小的非聚集索引。当你使用EXISTS时,SQL SERVER知道你要执行存在性检查,当它发现第一个匹配的值时,就会返回TRUE,并停止查询。此外,很多时候用EXISTS代替IN是一个好的选择,例如:
SELECT NUM FROM A WHERE NUM IN (SELECT NUM FROM B) 可以使用SELECT NUM FROM A WHERE EXISTS (SELECT 1 FROM B WHERE NUM=A.NUM)进行替代。
尽量不用 SELECT
绝大多数情况下,不要用 *来代替查询返回的字段列表,用 *的好处是代码量少,就算是表结构或视图的列发生变化,编写的查询SQL语句也不用变,都返回所有的字段。但数据库服务器在解析时,如果碰到 *,则会先分析表的结构,然后把表的所有字段名再罗列出来,这就增加了分析的时间。另一个问题是,SELECT *可能包含了不需要的列,增加了网络流量。如果在视图创建中使用了SELECT *,在后期如果有对视图基表的表结构进行了更改,当查询视图时,可能会生成意外结果,除非重建视图或利用SP_REFRESHVIEW更新视图的元数据。
慎用 SELECT DISTINCT
DISTINCT子句仅在特定功能的时候使用,即从记录集中排除重复记录的时候。这是因为DISTINCT子句先获取结果,进行排序集然后再去重,这样增加了SQL SERVER资源的消耗。在实际的业务中,如果你已经预先知道SELECT语句将从不返回重复记录,那么使用DISTINCT语句是对SQL SERVER资源不必要的浪费。当然,如果是符合特定的业务场景,是可以酌情使用的。
正确使用 UNION 和 UNION ALL 以及 WITH TEMPTABLENAME AS
许多人没完全理解UNION和UNION ALL是怎样工作的,因此,结果浪费了大量不必要的SQL Server资源。当使用UNION时,它相当于在结果集上执行SELECT DISTINCT。换句话说,UNION将联合两个相类似的记录集,然后搜索重复的记录并排除。如果这是你的目的,那么使用UNION是正确的。但如果你使用UNION联合的两个记录集本身就没有重复记录,那么使用UNION会浪费资源,因为它要寻找重复记录,即使你确定它们不存在。总而言之,联合无重复的结果集采用UNION ALL,联合存在重复记录的采用UNION。对于WITH TEMP TABLENAME AS,其实并没有建立临时表,只是子查询部分(SUBQUERY FACTORING),定义一个SQL片断,该SQL片断会被整个SQL语句所用到。有的时候,是为了让SQL语句的可读性更高些,也有可能是在UNION ALL的不同部分,作为提供数据的部分。特别对于UNION ALL比较有用。因为UNION ALL的每个部分可能相同,但是如果每个部分都去执行一遍的话成本太高,所以可以使用WITH AS短语,则只要执行一遍即可。
使用 SET NOCOUNT ON 选项
缺省地,每次执行SQL语句时,一个消息会从服务端发给客户端以显示SQL语句影响的行数。这些信息对客户端来说很少有用,甚至有些客户端会把这些信息当成错误信息处理。通过关闭这个缺省值,你能减少在服务端和客户端的网络流量,帮助全面提升服务器和应用程序的性能。为了关闭存储过程级的这个特点,在每个存储过程的开头包含SET NOCOUNT ON语句。同样,为减少在服务端和客户端的网络流量,生产环境中应该去掉存储过程中那些在调试过程中使用的SELECT和PRINT语句。
指定字段别名
当在SQL语句中连接多个表时,可以将表名或别名加到每个COLUMN前面,这样可以有效地减少解析的时间并减少那些由COLUMN歧义引起的语法错误。例如:
SELECT COLUMN_A,COLUMN_B FROM TABLE1 T1 INNER JOIN TABLE2 T2 ON T1.ID = T2.UID,其中COLUMN_A是TABLE1的数据列,COLUMN_B是TABLE2的数据列,这并不妨碍查询的进行,但是改成下列语句是不是更好呢?SELECT T1.COLUMN_A,T2.COLUMN_B FROM TABLE T1 INNER JOIN TABLE T2 ON T1.ID = T2.UID
建立索引
关于索引,下图展示出了索引的直观结构:



索引按照索引的类型可以分为聚集索引和非聚集索引,一张数据表只能存在一个聚集索引,但可以建立若干非聚集索引,聚集索引通常是建立在主键上,当然主键上不一定需要强制建立聚集索引。关于索引的实现原理可以参考这篇篇,以及。对于聚集索引而言,表中存储的数据按照索引的顺序存储,即逻辑顺序决定了表中相应行的物理顺序。对于非聚集索引,一般考虑在下列情形下使用非聚集索引:使用JOIN的条件字段、使用GROUP BY的字段、完全匹配的WHERE条件字段、外键字段等等。索引是有900字节大小限制的,因此不要在超长字段上建立索引,索引字段的总字节数不要超过900字节,否则插入的数据达到900字节时会报错。另外,并不是所有索引对查询都有效,SQL是根据表中数据来进行查询优化的,当索引列有大量数据重复时,SQL查询可能不会去利用索引,如一表中有字段Gender,Male、Female几乎各一半,那么即使在Gender上建了索引也对查询效率起不了作用。索引并不是越多越好,索引固然可以提高查询效率,但同时也降低了插入数据及更新数据的效率,因为插入或更新数据时有可能会重建索引,所以在建立索引时需要慎重考虑,视具体情况而定。总之,要根据实际的业务情景合理地为数据表建立索引。
存储过程
存储过程是数据库中的一个重要对象。存储过程实际上是对一些SQL脚本的有逻辑地组合而形成的,是一组为了完成特定功能的SQL 语句集。存储在数据库中,经过第一次编译后再次调用不需要再次编译,所以使用存储过程可提高数据库执行速度,用户通过指定存储过程的名字并给出参数(如果该存储过程带有参数)来执行它。存储过程执行计划能够重用,驻留在SQL SERVER内存的缓存里,减少服务器开销。当业务相对复杂的时候,可以将该业务封装成一个存储过程存储在数据库服务器,可以大大降低网络流量的传输,提高性能。例如,通过网络发送一个存储过程调用,而不是发送500行的T-SQL,这样速度会更快,资源占用更少,有效地避免了每次执行SQL时,都会执行解析SQL语句、估算索引的利用率、绑定变量、读取数据块等工作。存储过程可有效地降低数据库连接次数,当对数据库进行复杂操作时(如对多个表进行 Update,Insert,Query,Delete操作时),可将该复杂操作用存储过程封装起来与数据库提供的事务处理结合一起使用。这些操作,如果用程序来完成,就变成了一条条的 SQL 语句,可能要多次连接数据库。而采用成存储过程,只需要连接一次数据库就可以了。
事务和锁
事务是数据库应用中重要的工具,它具有原子性、一致性、隔离性、持久性这四个属性,很多操作我们都需要利用事务来保证数据的正确性。在使用事务中我们需要做到尽量避免死锁、尽量减少阻塞。开发过程中,可以通过以下几种方式来避免问题的产生:事务操作过程要尽量小,能拆分的事务要拆分开来,在更细的粒度上应用事务;事务操作过程中不应该有交互,因为交互等待的时候,事务并未结束,可能锁定了很多资源;事务操作过程要按同一顺序访问对象,比如在某一事务中要按顺序更新A、B两表,那么在其他的事务中就不要按B、A的顺序去更新这两个表。我在实际工作中就遇到过这种问题(如下图所示),由于在事务中需要同时更新主表和子表,子表的数据更新后主表汇总数据,但是更新两个表的时候,顺序不一致,由于事务的原子性,需要在同一事务中完成两表的更新操作,这就形成了Transaction A需要的资源(子表B)被Transaction B占据着,Transaction B需要的资源(主表A)被Transaction A占据着,导致表被锁住,造成了死锁,后来对表的更新顺序进行了调整,解决了这个问题。尽量不要指定锁类型和索引,SQL SERVER允许我们自己指定语句使用的锁类型和索引,但是一般情况下,SQL SERVER优化器选择的锁类型和索引是在当前数据量和查询条件下是最优的,我们指定的可能只是在目前情况下更优,但是数据量和数据分布在将来是会变化的。
SARG WHERE条件



下面是百度百科对SARG的解释: SARG (Searchable Arguments)操作,用于限制搜索的一个操作,它通常是指一个特定的匹配,一个值的范围内的匹配或者两个以上条件的AND连接。

SARG来源于Search Argument(搜索参数)的首字母拼成的SARG,它是指WHERE子句里,列和常量的比较。如果WHERE子句是SARGABLE(可SARG的),这意味着它能利用索引加速查询的完成。如果WHERE子句不是可SARG的,这意味着WHERE子句不能利用索引(或至少部分不能利用),执行的是全表或索引扫描,这会引起查询的性能下降。
在WHERE子句中,可以SARG的搜索条件包含以下如:包含以下操作符=、>、<、>=、<=、BETWEEN及部分情况下的LIKE(通配符在查询关键字之后,如LIKE 'A%')
在WHERE子句中,不可SARG的搜索条件如:IS NULL, <>, !=, !>, !<, NOT, NOT EXISTS, NOT IN, NOT LIKE和LIKE '%500',通常(但不总是)会阻止查询优化器使用索引执行搜索。另外在列上使用包括 函数 的表达式、两边都使用相同列的表达式、或和一个列(不是常量)比较的表达式,都是不可SARG的。并不是每一个不可SARG的WHERE子句都注定要全表扫描。如果WHERE子句包括两个可SARG和一个不可SARG的子句,那么至少可SARG的子句能使用索引(如果存在的话)帮助快速访问数据。
大多数情况下,如果表上有包括查询里所有SELECT、JOIN、WHERE子句用到的列的覆盖索引,那么覆盖索引能够代替全表扫描去返回查询的数据,即使它有不可SARG的WHERE子句。某些情况下,可以把不可SARG的WHERE子句重写成可SARG的子句。例如:
WHERE SUBSTRING(FirstName,1,1) = 'M'可以写成:WHERE FirstName LIKE 'M%' 这两个WHERE子句有相同的结果,但第一个是不可SARG的(因为使用了函数)将运行得慢些,而第二个是可SARG的,将运行得快些。如果你不知道特定的WHERE子句是不是可SARG的,可以在查询分析器里检查查询执行计划。这样做,你能很快地知道查询是使用了索引还是全表扫描来返回的数据。仔细分析,许多不可SARG的查询能写成可SARG的查询,从而实现性能的优化和提升。
查询条件中使用了不等于操作符(<>, !=)的SELECT语句执行效率较低,因为不等于操作符会限制索引,引起全表扫描,即使被比较的字段上有索引,这时可以通过把不等于操作符改成OR,可以使用索引,从而避免全表扫描。例如, 可以把SELECT TOP 100 AGE FROM TABLE WHERE AGE <> 25改写为SELECT TOP 1000 AGE FROM TABLE WHERE AGE > 25 OR AGE < 25
应当尽量避免在WHERE子句中对字段进行函数操作,这将导致引擎放弃使用索引而进行全表扫描。例如:SELECT ID FROM TABLE WHERE SUBSTRING(NAME, 1, 3) = 'ABC'
临时表和表变量
在复杂系统中,如果业务是以存储过程的方式组织的,那么中间必然会产生一些临时查询出的数据,此时临时表和表变量很难避免,关于临时表和表变量的用法,需要注意的是,如果语句很复杂,连接太多,可以考虑用临时表和表变量分步完成,将需要的结果集存储在临时表或表变量中,便于复用;同样地,如果需要多次用到一个大表的同一部分数据,考虑用临时表和表变量暂存这部分数据;如果需要综合多个表的数据,形成一个结果集,可以考虑用临时表和表变量分步汇总出这多个表的数据;其他情况下,应该控制临时表和表变量的使用。另外,在临时表完成自身功能后,要显式地删除临时表,先TRUNCATE TABLE,然后DROP TABLE,以避免资源的占用。关于临时表和表变量的选择,很多说法是表变量储存在内存,速度快,应该首选表变量,但是在实际使用中发现,这个选择主要是考虑需要放在临时表中的数据量,在数据量较多的情况下,临时表的速度反而更快。关于临时表的创建,使用SELECT INTO和CREATE TABLE + INSERT INTO的选择,我们做过测试,一般情况下,SELECT INTO会比CREATE TABLE + INSERT INTO的方法快很多,但是SELECT INTO会锁定TEMPDB的系统表SYSOBJECTS、SYSINDEXES、SYSCOLUMNS,在多用户并发环境下,容易阻塞其他进程,所以建议在并发系统中,尽量使用CREATE TABLE + INSERT INTO,而大数据量的单个语句使用中,使用SELECT INTO。
临时表和表变量是有区别的,表变量是存储在内存中的,当用户在访问表变量的时候,SQL SERVER是不产生日志的,而在临时表中是产生日志的;在表变量中,是不允许有非聚集索引的;表变量是不允许有DEFAULT默认值,也不允许有约束;临时表上的统计信息是健全而可靠的,但是表变量上的统计信息是不可靠的;临时表中是有锁的机制,而表变量中就没有锁的机制。了解二者的区别,可以针对特定场景选择最优方案,使用表变量主要需要考虑的就是应用程序对内存的压力,如果代码的运行实例很多,就要特别注意内存变量对内存的消耗。对于较小的数据或者是通过计算出来的数据推荐使用表变量。如果数据的结果比较大,在代码中用于临时计算,在选取的时候没有什么分组或聚合,也可以考虑使用表变量。一般对于大的数据结果集,或者因为统计出来的数据为了便于更好的优化,我们就推荐使用临时表,同时还可以创建索引,由于临时表是存放在Tempdb中,一般默认分配的空间很少,需要对Tempdb进行调优,增大其存储的空间。
结语
欢迎大家关注公众号:程序员一凡,获取软件测试大厂面试资料。
软件开发
2020-08-05 21:17:00
token验证,记得当年struts/struts2的时候,现在页面设置token,然后在
action进行验证,而在springmvc下,有更简单的方法

具体实现如下:
/**
* 防止重复提交
* @author 傻根她弟
*
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Documented
@Order(Ordered.LOWEST_PRECEDENCE)
public @interface Token {

}

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.validation.BindingResult;
import cn.he.annotation.Token;
import com.alibaba.fastjson.JSON;
@Aspect
@Component
public class TokenAspect {
private static final Logger logger = LoggerFactory.getLogger("TokenAspect");
@Before("within(@org.springframework.web.bind.annotation.RestController *) && @annotation(token)")
public void requestToken(final JoinPoint joinPoint, Token token) throws RequestTokenException {
try {
Object[] args = joinPoint.getArgs();

if(this.repeatDataValidator(args)){
throw new Exception("请勿重复提交数据");
}

} catch (RequestTokenException e) {
throw e;
} catch (Exception e) {
logger.error("发生异常: ", e);
}
}


/**
* 验证同一个url数据是否相同提交 ,相同返回true
* @param httpServletRequest
* @return
*/
private boolean repeatDataValidator(Object[] args)
{
HttpServletRequest request = null;
List new_obj = new ArrayList();

for (int i = 0; i < args.length; i++) {
if (args[i] instanceof HttpServletRequest) {
request = (HttpServletRequest) args[i];
break;
}
}

for (int i = 0; i < args.length; i++) {
if (args[i] instanceof HttpServletRequest) {
continue;
}
if (args[i] instanceof BindingResult) {
continue;
}
new_obj.add(args[i]);
}

if (request == null) {
throw new RequestTokenException("0","调用Token注释,方法中缺失HttpServletRequest参数");
}

String params= this.toJson(new_obj);
String url=request.getRequestURI();
Map map=new HashMap();
map.put(url, params);
String nowUrlParams=map.toString();//
Object preUrlParams=request.getSession().getAttribute("_req_repeatData");
if(preUrlParams==null)//如果上一个数据为null,表示还没有访问页面
{
request.getSession().setAttribute("_req_repeatData", nowUrlParams);
return false;
}
else//否则,已经访问过页面
{
if(preUrlParams.toString().equals(nowUrlParams))//如果上次url+数据和本次url+数据相同,则表示城府添加数据
{
return true;
}
else//如果上次 url+数据 和本次url加数据不同,则不是重复提交
{
request.getSession().setAttribute("_req_repeatData", nowUrlParams);
return false;
}

}
}

/**
* Object转成JSON数据
*/
private String toJson(Object object){
if(object instanceof Integer || object instanceof Long || object instanceof Float ||
object instanceof Double || object instanceof Boolean || object instanceof String){
return String.valueOf(object);
}
return JSON.toJSONString(object);
}
}

简单明了,前台页面什么也不用加,只需要在controller对应的方法上加上注解@Token该方法就生效。不过前提是TokenAspect 要生效,至于怎么让它生效,懂点spring的应该都会设置,此处就不再赘述了
软件开发
2019-09-03 14:47:00
ThreadPoolExecutor ThreadPoolExecutor = new ThreadPoolExecutor( 5 , 20 , 60 ,TIMEUNIT。 SECONDS , new ArrayBlockingQueue <可运行>( 3 ), new ThreadPoolExecutor.AbortPolicy());
软件开发
2020-07-21 22:24:00