博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mysql binlog解析概要
阅读量:7202 次
发布时间:2019-06-29

本文共 4409 字,大约阅读时间需要 14 分钟。

1,dump协议:

根据数据库的ip+port创建socket,如果创建成功,说明链接建立成功,接下来是使用dump协议订阅binlog

链接建立成功之后,服务端会主动向客户端发送如下问候信息greeting(可以理解为经java转换后,是一个java对象),

在下面的代码中可以看到greeting中的信息:

this.context.setServerStatus(greeting.getServerStatus());// this.context.setServerVersion(greeting.getServerVersion().toString()); this.context.setServerCollation(greeting.getServerCollation()); this.context.setServerCapabilities(greeting.getServerCapabilities()); this.context.setThreadId(greeting.getThreadId()); this.context.setProtocolVersion(greeting.getProtocolVersion()); this.context.setScramble(greeting.getScramble1().toString() + greeting.getScramble2().toString()); 然后如下: 将ctx=this.context;
final XSerializer s = new XSerializer(64); s.writeInt(buildClientCapabilities(), 4); s.writeInt(this.maximumPacketLength, 4); s.writeInt(this.clientCollation > 0 ? this.clientCollation : ctx.getServerCollation(), 1); s.writeBytes((byte)0, 23); // Fixed, all 0 s.writeNullTerminatedString(StringColumn.valueOf(this.user.getBytes(this.encoding))); s.writeInt(20, 1); // the length of the SHA1 encrypted password s.writeBytes(MySQLUtils.password41OrLater(this.password.getBytes(this.encoding), ctx.getScramble().getBytes(this.encoding))); if(this.initialSchema != null) s.writeNullTerminatedString(StringColumn.valueOf(this.initialSchema.getBytes(this.encoding))); //transport可以理解为是一个socket的包装后的东西 final RawPacket request = new RawPacket(); request.setSequence(1); request.setPacketBody(s.toByteArray()); request.setLength(request.getPacketBody().length); transport.getOutputStream().writePacket(request); transport.getOutputStream().flush(); 然后可以在下面的代码中得到mysql的相应:
final Packet response = transport.getInputStream().readPacket(); if(response.getPacketBody()[0] == ErrorPacket.PACKET_MARKER) {
final ErrorPacket error = ErrorPacket.valueOf(response); LOGGER.info("login failed, user: {}, error: {}", this.user, error); throw new TransportException(error); } else if(response.getPacketBody()[0] == OKPacket.PACKET_MARKER) {
final OKPacket ok = OKPacket.valueOf(response); LOGGER.info("login successfully, user: {}, detail: {}", this.user, ok); } else {
LOGGER.warn("login failed, unknown packet: ", response); throw new RuntimeException("assertion failed, invalid packet: " + response); } 2,dump报文格式:

下方表格来自mysql官网链接:http://dev.mysql.com/doc/internals/en/mysql-packet.html

Type Name Description
payload_length Length of the payload. The number of bytes in the packet beyond the initial 4 bytes that make up the packet header.
sequence_id Sequence ID
payload [len=] payload of the packet
报文对超大packet的支持方式如下:

当一个packet过大 (超过1<<24-1byte ~= 16 MB) 时, 传输需要对packet进行切割, 参看

注意, 在A上生成binlog时, 是可以容纳大于16MB的packet的, 也就是原binlog里存在超大的event, 需要在传输时加以限制

切割packet没什么特别之处, 仅需要注意包格式, 一个20MB的event的传输packet格式举例为 (此处用16MB便于描述, 应为1<<24-1byte):

packet 1        4字节 packet header        1字节 值为[00], 是binlog event的特征标志 16MB-1字节 为第一段数据 packet 2 4字节 packet header 20MB-16MB+1字节 为第二段数据

需要注意的是之后的packet时不带有[00]特征位的. 而包的大小计算范围为除去前4字节的全部字节

上面的讲解对应的代码为:

// Parse packet final int packetLength = is.readInt(3); final int packetSequence = is.readInt(1);//超大数据包切割后的顺序 is.setReadLimit(packetLength); // Ensure the packet boundary // final int packetMarker = is.readInt(1);//特征位 if(packetMarker != OKPacket.PACKET_MARKER) { // 0x00    if((byte)packetMarker == ErrorPacket.PACKET_MARKER) {
final ErrorPacket packet = ErrorPacket.valueOf(packetLength, packetSequence, packetMarker, is); throw new RuntimeException(packet.toString()); } else if((byte)packetMarker == EOFPacket.PACKET_MARKER) {
final EOFPacket packet = EOFPacket.valueOf(packetLength, packetSequence, packetMarker, is); throw new RuntimeException(packet.toString()); } else {
throw new RuntimeException("assertion failed, invalid packet marker: " + packetMarker); } }

 接下来直接解析mysql binlog即可,header获取如下:

// Parse the event header final BinlogEventV4HeaderImpl header = new BinlogEventV4HeaderImpl(); header.setTimestamp(is.readLong(4) * 1000L);//timestamp header.setEventType(is.readInt(1));//type_code header.setServerId(is.readLong(4));//server_id header.setEventLength(is.readInt(4));//event_length header.setNextPosition(is.readLong(4));//next_position header.setFlags(is.readInt(2));//flags header.setBinlogFileName(this.binlogFileName); header.setTimestampOfReceipt(System.currentTimeMillis()); body的解析不同事件格式不一样,类似header那样取值 注:代码来自开源项目open replicator

转载地址:http://rrwum.baihongyu.com/

你可能感兴趣的文章
Ubuntu16.04下安装配置xammp
查看>>
Hystrix的配置属性优先级和详解
查看>>
Activiti和Spring集成
查看>>
springmvc+spring+mybatis整合
查看>>
20.7 if特殊用法
查看>>
2.18 特殊权限set_uid 2.19 特殊权限set_gid 2.20 特殊权限stick_bit 2.21 软链接文件 2.22 硬连接文件...
查看>>
DBCP数据源配置分析
查看>>
Retroifit2
查看>>
防止未登录用户操作—struts2拦截器
查看>>
安装hbase
查看>>
Matter App提供了一个由BCH推动的长格式博客平
查看>>
vim操作
查看>>
sed
查看>>
Vue.js 搭建
查看>>
C++ 友元学习
查看>>
11.10/11.11/11.12 安装PHP5 11.13 安装PHP7
查看>>
正则介绍 、grep、egrep工具
查看>>
架构师视角:对JVM架构进行解析
查看>>
UINavigationController 自定义转场动画(模仿淘宝App跳转)
查看>>
转载 java中什么是bridge method(桥接方法)
查看>>