绕过SSH服务器的端口转发限制

0x00 背景

在某些场景下SSH服务器会禁用掉端口转发的能力,以降低安全风险。这会导致很多依赖SSH端口转发的工具无法正常工作。

这里主要是修改了/etc/ssh/sshd_config文件中以下几项实现的:

1
2
3
#AllowAgentForwarding yes
#AllowTcpForwarding yes
#X11Forwarding yes

此时,SSH服务器基本就变成了只能执行shell命令的工具,无法用于建立通信通道。

是否有办法可以绕过这一限制呢?答案是肯定的。

0x01 借尸还魂

SSH最常用的能力就是交互式命令行,所谓交互式命令行,就是允许用户进行实时输入,并将输出实时展示出来。也就是说:交互式命令行本身就是一个双向通信的通道。因此,可以编写一个程序,它会在初始化时与指定的服务器端口建立Socket连接,然后将所有stdin读到的数据实时发送给Socket,并将Socket接收到的数据写到stdout中,stderr则用于输出控制信息和日志等。

根据上面的分析,这个程序其实跟telnet命令非常相似,但又不完全相同。因此用GO写了下面这个程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package main

import (
"bufio"
"fmt"
"net"
"os"
"os/signal"
"strconv"
"sync"
"time"
)

func telnet(host string, port int) int {
conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), 10*time.Second)
if err != nil {
fmt.Fprintln(os.Stderr, fmt.Sprintf("[FAIL] Connect %s:%d failed: %s", host, port, err))
return -1
}
defer conn.Close()
fmt.Fprintln(os.Stderr, "[OKAY]")

var wg sync.WaitGroup
wg.Add(2)
go handleWrite(conn, &wg)
go handleRead(conn, &wg)
wg.Wait()

return 0
}

func handleRead(conn net.Conn, wg *sync.WaitGroup) int {
defer wg.Add(-2)
reader := bufio.NewReader(conn)
buff := make([]byte, 4096)
for {
var bytes int
var err error
bytes, err = reader.Read(buff)
if err != nil {
fmt.Fprintln(os.Stderr, "Error to read from upstream because of", err)
return -1
}

_, err = os.Stdout.Write(buff[:bytes])
if err != nil {
fmt.Fprintln(os.Stderr, "Error to write to stdout because of", err)
return -1
}
}
}

func handleWrite(conn net.Conn, wg *sync.WaitGroup) int {
defer wg.Add(-2)
reader := bufio.NewReader(os.Stdin)
buff := make([]byte, 4096)
for {
var bytes int
var err error
bytes, err = reader.Read(buff)
if err != nil {
fmt.Fprintln(os.Stderr, "Error to read from stdin because of", err)
return -1
}

_, err = conn.Write(buff[:bytes])
if err != nil {
fmt.Fprintln(os.Stderr, "Error to write to upstream because of", err)
return -1
}
}

}

func main() {
if len(os.Args) < 3 {
fmt.Fprintln(os.Stderr, "Usage: telnet host port")
os.Exit(-1)
}
host := os.Args[1]
port, _ := strconv.Atoi(os.Args[2])
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func(){
for sig := range c {
// sig is a ^C, handle it
fmt.Fprintln(os.Stderr, "Signal", sig)
os.Exit(0)
}
}()
os.Exit(telnet(host, port))
}

完整的代码可以参考:telnet-go

0x02 暗度陈仓

要使用telnet-go提供的通信通道,需要与ParamikoASyncSSH之类的SSH库进行集成才行。下面是使用ASyncSSH进行集成的核心逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class SSHProcessTunnel(SSHTunnel):
"""SSH Tunnel Over Process StdIn and StdOut"""

def __init__(self, tunnel, url, address):
super(SSHProcessTunnel, self).__init__(tunnel, url, address)
self._process = None

@classmethod
def has_cache(cls, url):
return False

async def _log_stderr(self):
while not self.closed():
error_line = await self._process.stderr.readline()
error_line = error_line.strip()
utils.logger.warn(
"[%s][stderr] %s" % (self.__class__.__name__, error_line.decode())
)
await asyncio.sleep(0.5)
self._process = None

async def connect(self):
ssh_conn = await self.create_ssh_conn()
if not ssh_conn:
return False
bin_path = self._url.path
cmdline = "%s %s %d" % (bin_path, self._addr, self._port)
self._process = await ssh_conn.create_process(cmdline, encoding=None)
await asyncio.sleep(0.5)

if self._process.exit_status is not None and self._process.exit_status != 0:
utils.logger.error(
"[%s] Create process %s failed: [%d]%s"
% (
self.__class__.__name__,
cmdline,
self._process.exit_status,
await self._process.stderr.read(),
)
)
return False
status_line = await self._process.stderr.readline()
if status_line.startswith(b"[OKAY]"):
utils.safe_ensure_future(self._log_stderr())
return True
elif status_line.startswith(b"[FAIL]"):
utils.logger.warn(
"[%s] Connect %s:%d failed: %s"
% (
self.__class__.__name__,
self._addr,
self._port,
status_line.decode(),
)
)
return False
else:
raise RuntimeError("Unexpected stderr: %s" % status_line.decode())

async def read(self):
if self._process:
buffer = await self._process.stdout.read(4096)
if buffer:
return buffer
raise utils.TunnelClosedError()

async def write(self, buffer):
if self._process:
return self._process.stdin.write(buffer)
else:
raise utils.TunnelClosedError()

def closed(self):
return self._process is None or self._process.exit_status is not None

def close(self):
if self._process:
self._process.stdin.write(b"\x03")

完整的代码可以参考:turbo-tunnel

turbo-tunnel中可以使用以下方法将流量转发给SSH服务器:

1
turbo-tunnel -l http://:8080/ -t ssh+process://root:password@1.1.1.1:2222/usr/local/bin/telnet

/usr/local/bin/telnettelnet-go在服务器上的路径,需要设置好可执行权限。

然后,本地通过http://127.0.0.1:8080代理访问的流量都会转发到ssh服务器上,从而实现了通过ssh服务器进行端口转发的目的。

0x03 总结

利用进程的实时输入输出,可以解决SSH服务器不支持端口转发的问题,从而绕过服务器限制,建立通信通道。这种方式应用场景更广,也更加隐蔽,只是使用上需要提前将一个文件拷贝到SSH服务器上,这里可能少数场景会有些阻碍(例如删除了chmod命令),需要寻找绕过这些限制的方法。

不过总的来说,使用这种方法,大大提升了建立SSH隧道的成功率,具有较大的实际应用价值。