Python Raw Socket使用示例(发送TCP SYN数据包)

1. TCP SYN扫描

端口扫描常用于用于探测服务器或主机开放端口情况,被计算机管理员用于确认安全策略,同时被攻击者用于识别目标主机上的可运作的网络服务。端口扫描 是向一定范围的服务器端口发送对应请求,以此确认可使用的端口。虽然其本身并不是恶意的网络活动,但也是网络攻击者探测目标主机服务,以利用该服务的已知 漏洞的重要手段。

TCP SYN扫描是端口扫描众多方式中的一种,其他方式包括TCP扫描,UDP扫描,ACK扫描,窗口扫描和FIN扫描等。

TCP SYN扫描是另一种TCP扫描。端口扫描工具不使用操作系统原生网络功能,而是自行生成、发送IP数据包,并监控其回应。这种扫描模式被称为“半开放扫 描”,因为它从不建立完整的TCP连接。端口扫描工具生成一个SYN包,如果目标端口开放,则会返回SYN-ACK包。扫描端回应一个RST包,然后在握 手完成前关闭连接。如果端口关闭了但未使用过滤,目标端口应该会持续返回RST包。

TCP SYN扫描优点:

给扫描工具全权控制数据包发送和等待回应时长的权力,允许更详细的回应分析。

SYN扫描从不会建立完整的连接。

2. python 代码

使用raw socket进行SYN 洪泛,封装多个函数使其模块化和易于理解。利用结构体可以方便的使用格式化字符串和变量列表来编码数据包。

import sys
import time

import socket
import struct

import random

def SendPacketData (Buffer = None , DestIP = "127.0.0.1" , DestPort = 0) :
    """SendPacketData"""
    
    if Buffer is None :
        return False
    
    try:
        Socket = socket.socket(socket.AF_INET,socket.SOCK_RAW)
        Socket.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)
        Socket.setsockopt(socket.SOL_SOCKET,socket.SO_SNDTIMEO,2000)
 
    except:
        Socket.close()
        return False
    
    try:
        Socket.sendto(Buffer,0,(DestIP , DestPort))
            
    except:
        Socket.close()
            
    return True

def SetPacketAddress (Number = 1) :
    """SetPakcetAddress"""
        
    return [".".join(["%d" % random.randint(1 , 0xFF) for i in range(0,4)]) for n in range(0 , Number)]

def SetPacketData (Length = 32) :
    """SetPacketData"""

    return "".join(["%s" % chr(random.randint(0 , 255)) for n in range(Length)])

def SetPacketCheckSum (Buffer = None) :
    """SetPacketCheckSum"""
    
    if Buffer == None :
        return False
    
    if len(Buffer) % 2 :
        Buffer += '/0'

    return ~sum([(ord(Buffer[n + 1]) << 8) + ord(Buffer[n]) for n in range(0 , len(Buffer) , 2)])
#or return ~sum([ord(Buffer[n]) + ord(Buffer[n + 1]) * 256  for n in range(0 , len(Buffer) , 2)])

def SynSendInit (DestIP = "127.0.0.1" , DestPort = 80) :
    """SynSendInit"""
    
    IpHdrLen = 20
    TcpHdrLen = 20
    
    IpVerlen    = (4 << 4 | IpHdrLen / 4)
    IpTotal_len = socket.htons(IpHdrLen + TcpHdrLen)
    
    IpDestIP = struct.unpack('i',socket.inet_aton(DestIP))[0]
    IpSourceIP = struct.unpack('i',socket.inet_aton(SetPacketAddress()[0]))[0]
    
    TcpSport = socket.htons(random.randint(1024 , 65000))
    TcpDport = socket.htons(DestPort)
    
    TcpLenres = (TcpHdrLen / 4 << 4 | 0)
    TcpSeq = socket.htonl(int(time.time()))
    
    Buffer = struct.pack("LLBBHHHLLBBHHH",IpSourceIP,IpDestIP,0,socket.IPPROTO_TCP,socket.htons(TcpHdrLen)
                                         ,TcpSport,TcpDport,TcpSeq,0,TcpLenres,2,socket.htons(8192),0,0)
    TcpChecksum = SetPacketCheckSum(Buffer)
    
    Buffer = struct.pack("BBHHHBBHLL",IpVerlen,0,IpTotal_len,1,0x40,255,socket.IPPROTO_TCP,0,IpSourceIP,IpDestIP)
    IpChecksum = SetPacketCheckSum(Buffer)
    
    tcpcsp = struct.pack("L",TcpChecksum)
    ipcsp = struct.pack("L",IpChecksum)
    
    return struct.pack("BBHHHBBHLLHHLLBBHHH",IpVerlen,0,IpTotal_len,1,0x40,255,socket.IPPROTO_TCP
                                            ,(ord(ipcsp[1]) << 8) + ord(ipcsp[0]),IpSourceIP,IpDestIP
                                            ,TcpSport,TcpDport,TcpSeq,0,TcpLenres,2,socket.htons(8192)
                                            ,(ord(tcpcsp[1]) << 8) + ord(tcpcsp[0]),0)

def SynSendMain (DestIP = "127.0.0.1" , DestPort = 80) :
    """SynSendMain"""
    
    return SendPacketData(SynSendInit(DestIP , DestPort) , DestIP , DestPort)

if __name__ == "__main__" :
    SynSendMain()

http://blog.csdn.net/dholer/article/details/6804124

    /bin/env python  
    # -*- coding: UTF-8 -*-   
     
    # 必须以root权限运行  
     
    import socket  
    import sys  
    import time  
     
    from struct import *  
     
    # 计算校验和  
    def checksum(msg):  
        s = 0 
        # 每次取2个字节  
        for i in range(0,len(msg),2):  
            w = (ord(msg[i]) << 8) + (ord(msg[i+1]))  
            s = s+w  
     
        s = (s>>16) + (s & 0xffff)  
        s = ~s & 0xffff 
     
        return s  
     
    def CreateSocket(source_ip,dest_ip):  
        try:  
            s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_TCP)  
        except socket.error, msg:  
            print 'Socket create error: ',str(msg[0]),'message: ',msg[1]  
            sys.exit()  
     
        # 设置手工提供IP头部  
        s.setsockopt(socket.IPPROTO_TCP, socket.IP_HDRINCL, 1)  
        return s  
     
    # 创建IP头部  
    def CreateIpHeader(source_ip, dest_ip):  
        packet = '' 
     
        # ip 头部选项  
        headerlen = 5 
        version = 4 
        tos = 0 
        tot_len = 20 + 20 
        id = random.randrange(18000,65535,1)  
        frag_off = 0 
        ttl = 255 
        protocol = socket.IPPROTO_TCP  
        check = 10 
        saddr = socket.inet_aton ( source_ip )  
        daddr = socket.inet_aton ( dest_ip )  
        hl_version = (version << 4) + headerlen  
        ip_header = pack('!BBHHHBBH4s4s', hl_version, tos, tot_len, id, frag_off, ttl, protocol, check, saddr, daddr)  
     
        return ip_header  
     
    # 创建TCP头部  
    def create_tcp_syn_header(source_ip, dest_ip, dest_port):  
        # tcp 头部选项  
        source = random.randrange(32000,62000,1)    # 随机化一个源端口  
        seq = 0 
        ack_seq = 0 
        doff = 5 
        # tcp flags  
        fin = 0 
        syn = 1 
        rst = 0 
        psh = 0 
        ack = 0 
        urg = 0 
        window = socket.htons (8192)    # 最大窗口大小  
        check = 0 
        urg_ptr = 0 
        offset_res = (doff << 4) + 0 
        tcp_flags = fin + (syn<<1) + (rst<<2) + (psh<<3) + (ack<<4) + (urg<<5)  
        tcp_header = pack('!HHLLBBHHH', source, dest_port, seq, ack_seq, offset_res, tcp_flags, window, check, urg_ptr)  
        # 伪头部选项  
        source_address = socket.inet_aton( source_ip )  
        dest_address = socket.inet_aton( dest_ip )  
        placeholder = 0 
        protocol = socket.IPPROTO_TCP  
        tcp_length = len(tcp_header)  
        psh = pack('!4s4sBBH', source_address, dest_address, placeholder, protocol, tcp_length);  
        psh = psh + tcp_header;  
        tcp_checksum = checksum(psh)  
     
        # 重新打包TCP头部,并填充正确地校验和  
        tcp_header = pack('!HHLLBBHHH', source, dest_port, seq, ack_seq, offset_res, tcp_flags, window, tcp_checksum, urg_ptr)  
        return tcp_header  
     
     
    def range_scan(source_ip, dest_ip, start_port, end_port) :  
        syn_ack_received = []   # 开放端口存储列表  
     
        for j in range (start_port, end_port) :  
            s = CreateSocket(source_ip, dest_ip)  
            ip_header = CreateIpHeader(source_ip, dest_ip)  
            tcp_header = create_tcp_syn_header(source_ip, dest_ip,j)  
            packet = ip_header + tcp_header  
     
            s.sendto(packet, (dest_ip, 0))  
     
            data = s.recvfrom(1024) [0][0:]  
     
            ip_header_len = (ord(data[0]) & 0x0f) * 4 
            ip_header_ret = data[0: ip_header_len - 1]  
            tcp_header_len = (ord(data[32]) & 0xf0)>>2 
            tcp_header_ret = data[ip_header_len:ip_header_len+tcp_header_len - 1]  
     
            if ord(tcp_header_ret[13]) == 0x12: # SYN/ACK flags   
                syn_ack_received.append(j)  
        return syn_ack_received  
     
     
    # 程序从这里开始:  
    open_port_list = []  
    ipsource = '192.168.1.95' 
    ipdest = '192.168.1.31' 
    start = 100 
    stop = 450 
    step = (stop-start)/10 
    scan_ports = range(start, stop, step)  
    if scan_ports[len(scan_ports)-1] < stop:  
        scan_ports.append(stop)  
    for i in range(len(scan_ports)-1):  
        opl = range_scan(ipsource, ipdest, scan_ports[i], scan_ports[i+1])  
        open_port_list.append(opl)  
    for i in range(len(open_port_list)):  
        print 'Process #: ',i,' Open ports: ',open_port_list[i]  
    print 'A list of all open ports found: ' 
    for i in range(len(open_port_list)):  
        for j in range(len(open_port_list[i])):  
            print open_port_list[i][j],', ' 

发表评论

电子邮件地址不会被公开。 必填项已用*标注