Source code for dscan.models.structures

#!/usr/bin/env python3
# encoding: utf-8

"""
structures.py
network elements of the scanner
"""
import struct
from enum import IntEnum
from dscan import log


[docs]class Status(IntEnum): """ Code values for the response status. """ SUCCESS = 0x00 UNAUTHORIZED = 0x01 FINISHED = 0x02 FAILED = 0xFF
[docs]class Operations(IntEnum): """ Commands available! """ AUTH = 0x01 READY = 0x02 COMMAND = 0x03 STATUS_REQ = 0x04 STATUS_RESP = 0x05 REPORT = 0x06
[docs]class Structure: __slots__ = () op_code = None _format = None HEADER = "<B" """ The object representation for the info exchanged between agents and servers. """ def __init__(self, *args, sock=None): if not sock: self.setData(*args) else: self.unpack(sock)
[docs] def setData(self, *args): fields = getattr(self, '__slots__', []) if len(args) != len(fields): raise TypeError("Expected {} args".format(len(fields))) for name, value in zip(fields, args): if isinstance(value, str): value = value.encode("ascii") setattr(self, name, value)
[docs] def unpack(self, sock): """ Unpacks a known command based on the predefined :param sock: :return: Instance of a message subclass """ if self._format: if isinstance(self._format, str): struct_size = struct.calcsize(self._format) data = struct.unpack(self._format, sock.recv(struct_size)) return self.setData(*data) else: size_fmt = self._format[0] sz_nbytes = struct.calcsize(size_fmt) sz_bytes = sock.recv(sz_nbytes) sz = struct.unpack(size_fmt, sz_bytes) data_fmt = self._format[1].format(*sz) dt_nbytes = struct.calcsize(data_fmt) dt_bytes = sock.recv(dt_nbytes) data = struct.unpack(data_fmt, dt_bytes) return self.setData(*data)
[docs] def pack(self): """ Returns a `struct.pack` string ready to be sent :return: `struct.pack` """ fmt = '' byte_order = '' values = [getattr(self, name, '') for name in self.__slots__] if self._format: if isinstance(self._format, str): if self._format.startswith(('<', '>', '!', '@')): byte_order = self._format[0] fmt = self._format[1:] fmt = f"{byte_order}B{fmt}" return struct.pack(fmt, self.op_code.value, *values) else: fmt = "{0}B{1}".format(*self._format) lengths = [len(getattr(self, name, '')) for name in self.__slots__ if isinstance(getattr(self, name, ''), bytes)] fmt = fmt.format(*lengths) return struct.pack(fmt, self.op_code.value, *lengths, *values)
[docs] @classmethod def create(cls, sock): try: op_size = struct.calcsize(cls.HEADER) op_bytes = sock.recv(op_size) if len(op_bytes) == 0: # agent disconnected ! return op, = struct.unpack(cls.HEADER, op_bytes) subs = cls.__subclasses__() for operation in subs: if operation.op_code.value == op: return operation(sock=sock) return None except (struct.error, ValueError) as e: log.info("Error parsing the message %s" % e) return None
[docs]class Auth(Structure): """ Authentication Request from an Agent to server! """ __slots__ = ('data', ) _format = "<128s" op_code = Operations.AUTH def __str__(self): return f"Auth(op_code={self.op_code}, data={self.data})"
[docs]class Ready(Structure): """ Ready to start Scan ! Sent by an Agent to the server With the client's current user id """ __slots__ = ('uid', 'alias') _format = ('<B', 'I{0}s') op_code = Operations.READY def __str__(self): return f"Ready({self.op_code}, uid={self.uid}, alias={self.alias})"
[docs]class Command(Structure): """ Scan task information ! Send by the server to the agent. final format is <BB?s?s """ __slots__ = ('target', "options") _format = ('<BB', '{0}s{1}s') op_code = Operations.COMMAND def __str__(self): return f"Command(op_code={self.op_code}, target={self.target}, " \ f"options={self.options})"
[docs]class Report(Structure): """ When an agent's task terminates, it initiates a report transfer request """ __slots__ = ('filesize', 'filename', 'filehash') _format = ('<BB', 'I{0}s{1}s') op_code = Operations.REPORT def __str__(self): return f"Report(op_code={self.op_code}, filesize={self.filesize}," \ f" filename={self.filename!s}, filehash={self.filehash})"