Open
Description
使用特定ip当作执行命令的指令 用于爬虫无感切换IP
例如访问特定的 http://1.0.9.9 用于执行一段shell 访问一个切换ip的http api / 或者切换一个 socks5的账号密码 这对爬虫程序非常有用
AI的例子
import asyncio
import aiohttp
import socket
import struct
import logging
import random
from aiohttp import web
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ProxyServer:
def __init__(self, host='0.0.0.0', port=8080):
self.host = host
self.port = port
self.upstream_proxies = [] # 上游代理池
self.current_ip = None
async def handle_socks5(self, reader, writer):
# SOCKS5 协议实现
# 身份验证
await reader.read(2)
writer.write(b'\x05\x00')
await writer.drain()
# 请求
version, cmd, _, address_type = struct.unpack('!BBBB', await reader.read(4))
if address_type == 1: # IPv4
addr = socket.inet_ntoa(await reader.read(4))
elif address_type == 3: # Domain name
domain_length = (await reader.read(1))[0]
addr = (await reader.read(domain_length)).decode()
else:
writer.close()
return
port = struct.unpack('!H', await reader.read(2))[0]
try:
if addr == '1.0.9.9': # 特殊 IP 地址,触发代理切换
success = await self.switch_proxy()
if success:
writer.write(b'\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00')
else:
writer.write(b'\x05\x01\x00\x01\x00\x00\x00\x00\x00\x00')
await writer.drain()
writer.close()
return
# 连接到目标服务器(通过上游代理)
upstream_proxy = random.choice(self.upstream_proxies)
remote_reader, remote_writer = await asyncio.open_connection(
upstream_proxy['host'], upstream_proxy['port'])
writer.write(b'\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00')
await writer.drain()
await self.proxy_data(reader, writer, remote_reader, remote_writer)
except Exception as e:
logger.error(f"Error in SOCKS5 proxy: {e}")
writer.close()
async def handle_http(self, request):
if request.path == 'http://1.0.9.9/': # 特殊 URL,触发代理切换
success = await self.switch_proxy()
if success:
return web.Response(text="Proxy switched successfully", status=200)
else:
return web.Response(text="Failed to switch proxy", status=500)
# 处理普通 HTTP 请求
upstream_proxy = random.choice(self.upstream_proxies)
async with aiohttp.ClientSession() as session:
async with session.request(
method=request.method,
url=request.url,
headers=request.headers,
data=await request.read(),
proxy=f"http://{upstream_proxy['host']}:{upstream_proxy['port']}"
) as resp:
headers = dict(resp.headers)
body = await resp.read()
return web.Response(body=body, headers=headers, status=resp.status)
async def proxy_data(self, reader, writer, remote_reader, remote_writer):
async def pipe(reader, writer):
try:
while True:
data = await reader.read(8192)
if not data:
break
writer.write(data)
await writer.drain()
except Exception as e:
logger.error(f"Error in proxy_data: {e}")
finally:
writer.close()
await asyncio.gather(
pipe(reader, remote_writer),
pipe(remote_reader, writer)
)
async def switch_proxy(self):
# 实现代理切换逻辑
# 这里可以是 HTTP API 调用、重新拨号或其他方式
# 示例使用 HTTP API
async with aiohttp.ClientSession() as session:
async with session.get('http://api.example.com/switch_proxy') as resp:
if resp.status == 200:
new_ip = await resp.text()
if new_ip != self.current_ip:
self.current_ip = new_ip
logger.info(f"Switched to new IP: {self.current_ip}")
return True
return False
async def start_server(self):
socks_server = await asyncio.start_server(
self.handle_socks5, self.host, self.port)
app = web.Application()
app.router.add_route('*', '/{path:.*}', self.handle_http)
runner = web.AppRunner(app)
await runner.setup()
http_server = web.TCPSite(runner, self.host, self.port + 1)
await asyncio.gather(
socks_server.serve_forever(),
http_server.start()
)
if __name__ == '__main__':
proxy_server = ProxyServer()
proxy_server.upstream_proxies = [
{'host': 'proxy1.example.com', 'port': 8080},
{'host': 'proxy2.example.com', 'port': 8080},
# 添加更多上游代理
]
asyncio.run(proxy_server.start_server())
Metadata
Metadata
Assignees
Labels
No labels