Skip to content

add new eventlog.py to examples #1799

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open

Conversation

NtAlexio2
Copy link
Contributor

This PR adds new file to examples folder. The file is added in order to PR #1798 and user is able to remove remote hosts eventlogs using this script. This is a very basic example of EVEN6 protocol usage. more features/options planned for future and also open to work by others ;)

@anadrianmanrique anadrianmanrique added Examples in review This issue or pull request is being analyzed labels Oct 3, 2024
@anadrianmanrique
Copy link
Collaborator

Hi, thanks for the PR. Lets continue working in the context of this PR your ideas. Which other use cases would be great to add? Would it be possible to remove event logs for a specific event ID ?

@NtAlexio2
Copy link
Contributor Author

in a very basic scenario we can do Query (Predefined and/or Custom), Export and Subscribe logs. also it is possible to list log channels on remote computer. we can add them all one by one

@NtAlexio2
Copy link
Contributor Author

Would it be possible to remove event logs for a specific event ID ?

Doing so from a remote computer is impossible as I know. The remote api EvtRpcClearLog() only gets the log path (not event ID). but we can use other trick(s) to remove specific event ID. Maybe:

  1. Backup logs to specific path
  2. Download the .evtx file
  3. Parse and modify .evtx file (remove desired records)
  4. Replace the new version on disk (restore)

I didn't test this before but it should work in theory. Don't forget that the SMB protocol SHOULD be enable for file operations.

@marcobarlottini
Copy link
Contributor

marcobarlottini commented Apr 8, 2025

@NtAlexio2 Great example! I wondered how would you do to add the export log feature, i tried to use your code

def clear(self, addr):
       stringbinding = self.__get_endpoint(addr)
       rpctransport = transport.DCERPCTransportFactory(stringbinding)

       if hasattr(rpctransport, 'set_credentials'):
           # This method exists only for selected protocol sequences.
           rpctransport.set_credentials(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash,
                                        self.__aesKey)
           rpctransport.set_kerberos(self.__doKerberos, self.__kdcHost)
       try:
           self.__do_clear(rpctransport)
       except Exception as e:
           if logging.getLogger().level == logging.DEBUG:
               import traceback
               traceback.print_exc()
           logging.error(e)

   def export(self, addr):
       stringbinding = self.__get_endpoint(addr)
       rpctransport = transport.DCERPCTransportFactory(stringbinding)

       if hasattr(rpctransport, 'set_credentials'):
           # This method exists only for selected protocol sequences.
           rpctransport.set_credentials(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash,
                                           self.__aesKey)
           rpctransport.set_kerberos(self.__doKerberos, self.__kdcHost)
       try:
           self.__do_export(rpctransport)
       except Exception as e:
           if logging.getLogger().level == logging.DEBUG:
               import traceback
               traceback.print_exc()
           logging.error(e)

   def __get_endpoint(self, target):
       return hept_map(target, even6.MSRPC_UUID_EVEN6, protocol='ncacn_ip_tcp')

   def __do_clear(self, rpctransport):
       dce = rpctransport.get_dce_rpc()

       dce.set_credentials(*rpctransport.get_credentials())
       if self.__doKerberos is True:
           dce.set_auth_type(RPC_C_AUTHN_GSS_NEGOTIATE)
       dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY)
       dce.connect()
       dce.bind(even6.MSRPC_UUID_EVEN6)

       resp = even6.hEvtRpcRegisterControllableOperation(dce)
       logging.info("EvtRpcRegisterControllableOperation - OK")

       even6.hEvtRpcClearLog(dce, resp['Handle'], self.__channelName)
       logging.info("EvtRpcClearLog - OK")

       even6.hEvtRpcClose(dce, resp['Handle'])
       logging.info("EvtRpcClose - OK")

   def __do_export(self, rpctransport):
       dce = rpctransport.get_dce_rpc()

       dce.set_credentials(*rpctransport.get_credentials())
       if self.__doKerberos is True:
           dce.set_auth_type(RPC_C_AUTHN_GSS_NEGOTIATE)
       dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY)
       dce.connect()
       dce.bind(even6.MSRPC_UUID_EVEN6)

       resp = even6.hEvtRpcRegisterControllableOperation(dce)
       logging.info("EvtRpcRegisterControllableOperation - OK")

       even6.hEvtRpcExportLog(dce, resp['Handle'], self.__channelName, "*", "C:\\temp\\security.evtx")
       logging.info("hEvtRpcExportLog - OK")

       even6.hEvtRpcClose(dce, resp['Handle'])
       logging.info("EvtRpcClose - OK")

according to the MS-EVEN6 spec:

error_status_t EvtRpcExportLog(

 [in, context_handle] PCONTEXT_HANDLE_OPERATION_CONTROL control = {handle from step 1},

 [in, unique, range(0, MAX_RPC_CHANNEL_NAME_LENGTH), string]

 LPCWSTR channelPath = L"Application",

 [in, range(1, MAX_RPC_QUERY_LENGTH), string]

 LPCWSTR query = L"*",

 [in, range(1, MAX_RPC_FILE_PATH_LENGTH), string]

 LPCWSTR backupPath = L"c:\\backup\\application.evtx",

 [in] DWORD flags = 0x00000001 (EvtExportLogChannelPath),

 [out] RpcInfo* error
);

but i get the ERROR:root:rpc_x_bad_stub_data

@marcobarlottini
Copy link
Contributor

hey, i tried to execute your tests for EvtRpcExportLog and EvtRpcClearLog (#1798) but while EvtRpcClearLog passed EvtRpcExportLog did not pass, i am using a standard Windows 10 box with a local administrator account for the test

platform linux -- Python 3.10.12, pytest-8.3.5, pluggy-1.5.0
rootdir: /home/ubuntu/impacket/impacket
configfile: tox.ini
plugins: cov-6.1.1
collected 1931 items / 1923 deselected / 8 selected                                                                                          

tests/dcerpc/test_even6.py ..F...F.                                                                                                    [100%]

================================================================== FAILURES ==================================================================
________________________________________________ EVEN6TestsTCPTransport.test_hEvtRpcExportLog ________________________________________________

self = <tests.dcerpc.test_even6.EVEN6TestsTCPTransport testMethod=test_hEvtRpcExportLog>

    def test_hEvtRpcExportLog(self):
        dce, rpctransport = self.connect()
    
        resp = even6.hEvtRpcRegisterControllableOperation(dce)
        resp.dump()
    
        control_handle = resp['Handle']
    
>       resp = even6.hEvtRpcExportLog(dce, control_handle, 'Security\x00', '*\x00', 'C:\\Security_Log_Exported.evtx\x00')

tests/dcerpc/test_even6.py:63: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
impacket/dcerpc/v5/even6.py:391: in hEvtRpcExportLog
    resp = dce.request(request)
impacket/dcerpc/v5/rpcrt.py:861: in request
    answer = self.recv()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <impacket.dcerpc.v5.rpcrt.DCERPC_v5 object at 0x76359bb46080>

    def recv(self):
        finished = False
        forceRecv = 0
        retAnswer = b''
        while not finished:
            # At least give me the MSRPCRespHeader, especially important for
            # TCP/UDP Transports
            response_data = self._transport.recv(forceRecv, count=MSRPCRespHeader._SIZE)
            response_header = MSRPCRespHeader(response_data)
            # Ok, there might be situation, especially with large packets, that
            # the transport layer didn't send us the full packet's contents
            # So we gotta check we received it all
            while len(response_data) < response_header['frag_len']:
               response_data += self._transport.recv(forceRecv, count=(response_header['frag_len']-len(response_data)))
    
            off = response_header.get_header_size()
    
            if response_header['type'] == MSRPC_FAULT and response_header['frag_len'] >= off+4:
                status_code = unpack("<L",response_data[off:off+4])[0]
                if status_code in rpc_status_codes:
>                   raise DCERPCException(rpc_status_codes[status_code])
E                   impacket.dcerpc.v5.rpcrt.DCERPCException: rpc_x_bad_stub_data

impacket/dcerpc/v5/rpcrt.py:1329: DCERPCException
------------------------------------------------------------ Captured stdout call ------------------------------------------------------------
EvtRpcRegisterControllableOperationResponse 
Handle:                         
    context_handle_attributes:       0 
    context_handle_uuid:             b'\xafk\xefzw\x8c1N\x9b\xc25\xa9]\x1f\xb3V' 
Error:                           0 


_______________________________________________ EVEN6TestsTCPTransport64.test_hEvtRpcExportLog _______________________________________________

self = <tests.dcerpc.test_even6.EVEN6TestsTCPTransport64 testMethod=test_hEvtRpcExportLog>

    def test_hEvtRpcExportLog(self):
        dce, rpctransport = self.connect()
    
        resp = even6.hEvtRpcRegisterControllableOperation(dce)
        resp.dump()
    
        control_handle = resp['Handle']
    
>       resp = even6.hEvtRpcExportLog(dce, control_handle, 'Security\x00', '*\x00', 'C:\\Security_Log_Exported.evtx\x00')

tests/dcerpc/test_even6.py:63: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
impacket/dcerpc/v5/even6.py:391: in hEvtRpcExportLog
    resp = dce.request(request)
impacket/dcerpc/v5/rpcrt.py:861: in request
    answer = self.recv()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <impacket.dcerpc.v5.rpcrt.DCERPC_v5 object at 0x76359aff1cf0>

    def recv(self):
        finished = False
        forceRecv = 0
        retAnswer = b''
        while not finished:
            # At least give me the MSRPCRespHeader, especially important for
            # TCP/UDP Transports
            response_data = self._transport.recv(forceRecv, count=MSRPCRespHeader._SIZE)
            response_header = MSRPCRespHeader(response_data)
            # Ok, there might be situation, especially with large packets, that
            # the transport layer didn't send us the full packet's contents
            # So we gotta check we received it all
            while len(response_data) < response_header['frag_len']:
               response_data += self._transport.recv(forceRecv, count=(response_header['frag_len']-len(response_data)))
    
            off = response_header.get_header_size()
    
            if response_header['type'] == MSRPC_FAULT and response_header['frag_len'] >= off+4:
                status_code = unpack("<L",response_data[off:off+4])[0]
                if status_code in rpc_status_codes:
>                   raise DCERPCException(rpc_status_codes[status_code])
E                   impacket.dcerpc.v5.rpcrt.DCERPCException: rpc_x_bad_stub_data

impacket/dcerpc/v5/rpcrt.py:1329: DCERPCException
------------------------------------------------------------ Captured stdout call ------------------------------------------------------------
EvtRpcRegisterControllableOperationResponse 
Handle:                         
    context_handle_attributes:       0 
    context_handle_uuid:             b'\xff\x93\x86\xa8v\xeb\x8aI\xba\x1fr\xcc\xf9\xa5S\xf7' 
Error:                           0 


============================================================== warnings summary ==============================================================
../.venv/lib/python3.10/site-packages/pyasn1/codec/ber/encoder.py:952
  /home/ubuntu/impacket/.venv/lib/python3.10/site-packages/pyasn1/codec/ber/encoder.py:952: DeprecationWarning: tagMap is deprecated. Please use TAG_MAP instead.
    warnings.warn(f"{attr} is deprecated. Please use {newAttr} instead.", DeprecationWarning)

../.venv/lib/python3.10/site-packages/pyasn1/codec/ber/encoder.py:952
  /home/ubuntu/impacket/.venv/lib/python3.10/site-packages/pyasn1/codec/ber/encoder.py:952: DeprecationWarning: typeMap is deprecated. Please use TYPE_MAP instead.
    warnings.warn(f"{attr} is deprecated. Please use {newAttr} instead.", DeprecationWarning)

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
========================================================== short test summary info ===========================================================
FAILED tests/dcerpc/test_even6.py::EVEN6TestsTCPTransport::test_hEvtRpcExportLog - impacket.dcerpc.v5.rpcrt.DCERPCException: rpc_x_bad_stub_data
FAILED tests/dcerpc/test_even6.py::EVEN6TestsTCPTransport64::test_hEvtRpcExportLog - impacket.dcerpc.v5.rpcrt.DCERPCException: rpc_x_bad_stub_data
========================================== 2 failed, 6 passed, 1923 deselected, 2 warnings in 7.28s ==========================================

@NtAlexio2
Copy link
Contributor Author

@marcobarlottini I fixed the code. thanks for your review. there was a misspelling datatype in EvtRpcExportLog type. as you can see now it passes the test:


Running pytest with args: ['-p', 'vscode_pytest', '--rootdir=c:\\Dev\\impacket', 'c:\\Dev\\impacket\\tests\\dcerpc\\test_even6.py::EVEN6TestsTCPTransport64::test_hEvtRpcExportLog']
============================= test session starts =============================
platform win32 -- Python 3.11.5, pytest-8.3.4, pluggy-1.5.0
rootdir: c:\Dev\impacket
configfile: tox.ini
plugins: anyio-4.6.2.post1, cov-6.1.1, docker-3.1.1
collected 1 item

tests\dcerpc\test_even6.py .                                             [100%]

============================== 1 passed in 0.29s ==============================
Finished running tests!

@NtAlexio2
Copy link
Contributor Author

@anadrianmanrique Also I added -export functionality to the example file. sample output:

C:\Dev>python eventlog.py -export Security -path C:\windows\temp\security.evtx admin:[email protected]
Impacket v0.13.0.dev0 - Copyright Fortra, LLC and its affiliated companies

INFO:root:EvtRpcRegisterControllableOperation - OK
INFO:root:hEvtRpcExportLog - OK
INFO:root:EvtRpcClose - OK

C:\Dev>python smbclient.py admin:[email protected]
Impacket v0.13.0.dev0 - Copyright Fortra, LLC and its affiliated companies

Type help for list of commands
# use C$
# cd windows\temp
# pwd
/windows/temp
# ls
drw-rw-rw-          0  Wed May  7 23:55:16 2025 .
drw-rw-rw-          0  Sun Apr 20 05:07:31 2025 ..
-rw-rw-rw-   21041152  Wed May  7 23:55:16 2025 security.evtx
# exit

C:\Dev>

Security log successfully exported to disk as you can see.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Examples in review This issue or pull request is being analyzed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants