Fork me on GitHub

daydayup863

人生就像一杯茶,不会苦一辈子,但总会苦一阵子。

0%

使用go去抓取PostgreSQL端口的SQL

之前被问到tcpdump去抓取PostgreSQL的SQL时,怎么将SQL解析成可读内容,知道有wireshark可以解析tcpdump生成的cap文件,但不知道wireshake还有针对pg协议专门的扩展, 一直也没遇到过这种需求。最近在学习go语言,于是就想到能否用go直接抓包解析呢。

这篇文章就主要以获取SQL为主。

抓包需要使用gopacket包,详细的教程

原理

客户端向数据库发送SQL时,例如最简单的select 1,数据库端收到的是如下字节数组

1
[81 0 0 0 14 115 101 108 101 99 116 32 49 59 0]

其中81表示Q(Query),
0 0 0 14 表示一个int32的长度,该值-5(协议类型Q+自身长度4) 表示Query实际长度,因此实际Query byte数组为 115 101 108 101 99 116 32 49 59

使用函数转成字符如下:

1
2
3
4
5
mydb=# select chr(115), chr(101), chr(108), chr(101), chr(99), chr(116), chr(32), chr(49), chr(59);
chr | chr | chr | chr | chr | chr | chr | chr | chr
-----+-----+-----+-----+-----+-----+-----+-----+-----
s | e | l | e | c | t | | 1 | ;
(1 row)

可以看到刚好是”select 1;”, 完美的取到整个SQL.

实际运行效果如下所示:

avatar

解析’Q’相对来说是比较简单的,要想实现其它类型的协议,是一个不小的工程。

其他前后端协议

前端发送的协议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
'p': "Password message"
'Q': "Simple query"
'P': "Parse"
'B': "Bind"
'E': "Execute"
'D': "Describe"
'C': "Close"
'H': "Flush"
'S': "Sync"
'F': "Function call"
'd': "Copy data"
'c': "Copy completion"
'f': "Copy failure"
'X': "Termination"
'0': "NULL"

后端发送的协议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
'R': "Authentication request"
'K': "Backend key data"
'S': "Parameter status"
'1': "Parse completion"
'2': "Bind completion"
'3': "Close completion"
'C': "Command completion"
't': "Parameter description"
'T': "Row description"
'D': "Data row"
'I': "Empty query"
'n': "No data"
'E': "Error"
'N': "Notice"
's': "Portal suspended"
'Z': "Ready for query"
'A': "Notification"
'V': "Function call response"
'G': "CopyIn response"
'H': "CopyOut response"
'd': "Copy data"
'c': "Copy completion"
0: NULL

完成代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package main

import (
"fmt"
"github.com/google/gopacket"
"time"
"log"
"github.com/google/gopacket/pcap"
"github.com/google/gopacket/layers"
"net"
)

type PgBaseLayer struct {
flag byte
length []byte
AnotherByte []byte
}

func (baselayer *PgBaseLayer) DecodeStructFromBytes(data []byte) error {
baselayer.flag = data[0]
baselayer.length = data[1:5]
baselayer.AnotherByte = data[5:]
return nil
}

var PgBaseLayerType = gopacket.RegisterLayerType(
2001,
gopacket.LayerTypeMetadata{
"PgBaseLayerType",
gopacket.DecodeFunc(decodePgBaseLayer),
},
)

func (l PgBaseLayer) LayerType() gopacket.LayerType {
return PgBaseLayerType
}

func (l PgBaseLayer) LayerContents() []byte {
return []byte{l.flag}
}

//自定义层实现LayerPayload
func (l PgBaseLayer) LayerPayload() []byte {
return l.AnotherByte
}

func decodePgBaseLayer(data []byte, p gopacket.PacketBuilder) error {
p.AddLayer(&PgBaseLayer{data[0], data[1:5], data[5:]})
return p.NextDecoder(gopacket.LayerTypePayload)
}

type Query struct{
sqlstring string;
}

type Client struct{
Seq uint32
Protocol layers.IPProtocol
SrcIP net.IP
SrcPort layers.TCPPort
DstIP net.IP
DstPort layers.TCPPort
}

func GetClientInfo(packet gopacket.Packet, tcp *layers.TCP) Client {
var client Client
client.Seq = tcp.Seq
client.SrcPort = tcp.SrcPort
client.DstPort = tcp.DstPort
ipLayer := packet.Layer(layers.LayerTypeIPv4)
if ipLayer != nil {
ip, _ := ipLayer.(*layers.IPv4)
// IP layer variables:
// Version (Either 4 or 6)
// IHL (IP Header Length in 32-bit words)
// TOS, Length, Id, Flags, FragOffset, TTL, Protocol (TCP?),
// Checksum, SrcIP, DstIP
client.Protocol = ip.Protocol
client.SrcIP = ip.SrcIP
client.DstIP = ip.DstIP
}

return client
}


var (
device string = "lo"
snapshot_len int32 = 1024
promiscuous bool = false
err error
timeout time.Duration = -1 * time.Second /*实时输出*/
handle *pcap.Handle
)


func main() {
handle, err = pcap.OpenLive(device, snapshot_len, promiscuous, timeout)
if err != nil {log.Fatal(err) }
defer handle.Close()

// Set filter
var filter string = "tcp and port 5557" /* 只抓取pg端口5557上的tcp*/
err = handle.SetBPFFilter(filter)
if err != nil {
log.Fatal(err)
}

fmt.Println("Only capturing TCP port 5557 packets.")

packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
for packet := range packetSource.Packets() {
tcpLayer := packet.Layer(layers.LayerTypeTCP)
if tcpLayer == nil {continue}

tcp, _ := tcpLayer.(*layers.TCP)
if tcp.Payload == nil || len(tcpLayer.LayerPayload()) <= 0 { continue }

client := GetClientInfo(packet, tcp)

baselayer := PgBaseLayer{}
baselayer.DecodeStructFromBytes(tcp.Payload)

switch baselayer.flag{
/* new connect */
case 0:
fmt.Println("receive new connection FROM %s:%s", client.SrcIP, tcp.SrcPort)
break;

/* simple query */
case 'Q':
fmt.Printf("Protocol: %s FROM %s:%s TO %s:%s, SQL: %s\n", client.Protocol, client.SrcIP, tcp.SrcPort, client.DstIP, tcp.DstPort, string(baselayer.AnotherByte))
break;

/*
case 'T':
var nattrs = baselayer.AnotherByte[1]
fmt.Println(nattrs)
fmt.Println(baselayer.AnotherByte)
fmt.Println(string(baselayer.AnotherByte))
*/

/* found error */
/*
case 'E':
fmt.Println(baselayer.AnotherByte)
fmt.Println(string(baselayer.AnotherByte))
*/

default:
//fmt.Println("does not support protocol: ", baselayer.flag)
break;
}

}
}

参考

https://www.postgresql.org/docs/current/protocol-message-formats.html
https://www.devdungeon.com/content/packet-capture-injection-and-analysis-gopacket#decoding-packet-layers

-------------本文结束感谢您的阅读-------------
听说,打赏我的人都找到了真爱

欢迎关注我的其它发布渠道