Статья Простой многопоточный сканер портов

  • Автор темы Cenzor
  • Дата начала
Хотелось бы показать маленькую реализацию простого, многопоточного сканера. Данный код можно использовать как дополнительный модуль в своей архитектуре, подключив его.
Сам сканер представляет из себя класс ScanIP с тремя методами.
В данном варианте, при создании экземпляра класса, его конструктор принимает кортеж, состоящий из двух переменных – аргументов командной строки – ip-адрес а и списка портов соответственно.
Python:
parser = argparse.ArgumentParser(usage='''%(prog)s [опции]\nДля вызова помощи: %(prog)s -h''')
    parser.add_argument('ip', action='store', type=str, help='spec target host')
    parser.add_argument('ports', action='store', type=str, help='spec target port')
    args = parser.parse_args()
    scan = ScanIP(args)
Первый аргумент командной строки принимает ip-адрес, второй – список портов. Список портов перечисляется через запятую, значения должны быть целочисленными, либо в виде диапазона через тире, например: 21,25-27,99,110,113,443,8000-8005.
В итоге получится список: 21,25,26,27,99,110,113,443,8000,8001,8002,8003,8004,8005.
Разбор строки со значениями портов происходит в конструкторе:
Python:
    def __init__(self, args):
        self.ip = args.ip
        self.ports = str(args.ports).split(',')
        self.port_list = []
        for port in self.ports:
            try:
                if '-' in port:
                    l2 = port.split('-')
                    for x in range(int(l2[0]), int(l2[1]) + 1):
                        self.port_list.append(x)
                else:
                    self.port_list.append(int(port))
            except ValueError as ve:
                print(str(ve) + '\nНеверно указан порт (диапазон портов)')
                sys.exit()
        self.portScan(self.ip, self.port_list)
В нём же вызываем метод portScan, в который передаётся два аргумента - ip-адрес и список портов.
В методе portScan список портов делим на части, равные количеству запускаемых потоков, в данном примере эта величина определена переменной size:
Python:
size = 256
        while len(port_list) > size:
            piece = port_list[:size]
            port_list_split.append(piece)
            port_list = port_list[size:]
        port_list_split.append(port_list)
После получения двумерного массива, для каждого элемента с вложенными значениями организовываем цикл и запускаем потоки, дожидаясь их завершения:
Python:
        for port_list in tqdm(port_list_split):
            for port in port_list:
                t = Thread(target=self.connScan, args=(ip, port))
                t.start()
                threads.append(t)
            for t in threads:
                t.join()
Таргет каждого потока - метод connScan, который в свою очередь отсылает на сканируемый ip-адрес данные для проверки ответа. Если ответ получен - значит порт открыт:
Python:
    def connScan(self, ip, port):
        try:
            connSkt = socket(AF_INET, SOCK_STREAM)
            connSkt.connect((ip, port))
            connSkt.send('fffffffffffffffffffffff\r\n'.encode())
            results = connSkt.recv(100)
            self.screenLock.acquire()
            self.open_ports.append('[+]{0} open'.format(port))
        except Exception as ex:
            self.screenLock.acquire()
        finally:
            self.screenLock.release()
            connSkt.close()
После окончания сканирования выводим информацию в консоль.
Полный код под спойлером
Python:
import argparse
import sys
from socket import *
from threading import *
from tqdm import tqdm


class ScanIP():

    screenLock = Semaphore(value=1)
    open_ports = []

    def __init__(self, args):
        self.ip = args.ip
        self.ports = str(args.ports).split(',')
        self.port_list = []
        for port in self.ports:
            try:
                if '-' in port:
                    l2 = port.split('-')
                    for x in range(int(l2[0]), int(l2[1]) + 1):
                        self.port_list.append(x)
                else:
                    self.port_list.append(int(port))
            except ValueError as ve:
                print(str(ve) + '\nНеверно указан порт (диапазон портов)')
                sys.exit()
        self.portScan(self.ip, self.port_list)

    def portScan(self, ip, port_list):
        setdefaulttimeout(1)
        threads = []
        port_list_split = []
        size = 256
        while len(port_list) > size:
            piece = port_list[:size]
            port_list_split.append(piece)
            port_list = port_list[size:]
        port_list_split.append(port_list)
        for port_list in tqdm(port_list_split):
            for port in port_list:
                t = Thread(target=self.connScan, args=(ip, port))
                t.start()
                threads.append(t)
            for t in threads:
                t.join()
        for item in self.open_ports:
            print(item)

    def connScan(self, ip, port):
        try:
            connSkt = socket(AF_INET, SOCK_STREAM)
            connSkt.connect((ip, port))
            connSkt.send('fffffffffffffffffffffff\r\n'.encode())
            results = connSkt.recv(100)
            self.screenLock.acquire()
            self.open_ports.append('[+]{0} open'.format(port))
        except Exception as ex:
            self.screenLock.acquire()
        finally:
            self.screenLock.release()
            connSkt.close()

if __name__ == '__main__':
    parser = argparse.ArgumentParser(usage='''%(prog)s [опции]\nДля вызова помощи: %(prog)s -h''')
    parser.add_argument('ip', action='store', type=str, help='spec target host')
    parser.add_argument('ports', action='store', type=str, help='spec target port')
    args = parser.parse_args()
    scan = ScanIP(args)
 
Мы в соцсетях:

Обучение наступательной кибербезопасности в игровой форме. Начать игру!