go_digital_backend/file_server.py
2025-06-17 20:40:48 +08:00

142 lines
5.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import http.server
import socketserver
import os
import argparse
from urllib.parse import unquote
import mimetypes
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
class FileHandler(http.server.SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
self.base_directory = os.path.abspath(os.path.join(os.getcwd(), 'audio'))
super().__init__(*args, directory=self.base_directory, **kwargs)
def do_GET(self):
try:
# 解码URL路径
path = unquote(self.path)
# 获取文件的完整路径
file_path = os.path.abspath(os.path.join(self.base_directory, path.lstrip('/')))
# 安全检查确保请求的路径在audio目录下
if not file_path.startswith(self.base_directory):
self.send_error(403, "Access denied")
return
# 检查文件是否存在
if not os.path.exists(file_path):
self.send_error(404, "File not found")
return
# 如果是目录,显示目录内容
if os.path.isdir(file_path):
self.send_directory_listing(file_path)
return
# 只允许访问音频文件
allowed_extensions = {'.wav', '.mp3', '.ogg', '.m4a', '.flac'}
if not any(file_path.lower().endswith(ext) for ext in allowed_extensions):
self.send_error(403, "File type not allowed")
return
# 获取文件的MIME类型
content_type, _ = mimetypes.guess_type(file_path)
if content_type is None:
content_type = 'application/octet-stream'
# 发送文件
self.send_file(file_path, content_type)
except Exception as e:
logging.error(f"Error handling request: {str(e)}")
self.send_error(500, f"Internal server error: {str(e)}")
def send_file(self, file_path, content_type):
try:
with open(file_path, 'rb') as f:
self.send_response(200)
self.send_header('Content-type', content_type)
self.send_header('Content-Disposition', f'attachment; filename="{os.path.basename(file_path)}"')
self.end_headers()
self.wfile.write(f.read())
except Exception as e:
logging.error(f"Error sending file: {str(e)}")
self.send_error(500, f"Error reading file: {str(e)}")
def send_directory_listing(self, directory):
try:
self.send_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
# 生成目录列表HTML
html = ['<!DOCTYPE html>',
'<html><head><title>Audio Files Directory</title>',
'<style>',
'body { font-family: Arial, sans-serif; margin: 20px; }',
'table { border-collapse: collapse; width: 100%; }',
'th, td { padding: 8px; text-align: left; border-bottom: 1px solid #ddd; }',
'tr:hover { background-color: #f5f5f5; }',
'.audio-file { color: #0066cc; }',
'</style></head><body>',
'<h1>Audio Files Directory</h1>',
'<table>',
'<tr><th>Name</th><th>Size</th><th>Type</th></tr>']
# 添加父目录链接
if self.path != '/':
html.append('<tr><td><a href="..">..</a></td><td>-</td><td>Directory</td></tr>')
# 列出目录内容
for item in sorted(os.listdir(directory)):
item_path = os.path.join(directory, item)
is_dir = os.path.isdir(item_path)
# 只显示目录和音频文件
if not is_dir and not any(item.lower().endswith(ext) for ext in {'.wav', '.mp3', '.ogg', '.m4a', '.flac'}):
continue
size = '-' if is_dir else f"{os.path.getsize(item_path):,} bytes"
item_type = 'Directory' if is_dir else 'Audio File'
item_class = 'audio-file' if not is_dir else ''
html.append(f'<tr><td><a href="{os.path.join(self.path, item)}" class="{item_class}">{item}</a></td><td>{size}</td><td>{item_type}</td></tr>')
html.append('</table></body></html>')
self.wfile.write('\n'.join(html).encode('utf-8'))
except Exception as e:
logging.error(f"Error generating directory listing: {str(e)}")
self.send_error(500, f"Error generating directory listing: {str(e)}")
def run_server(port):
# 确保audio目录存在
audio_dir = os.path.join(os.getcwd(), 'audio')
if not os.path.exists(audio_dir):
os.makedirs(audio_dir)
logging.info(f"Created audio directory at: {audio_dir}")
# 创建服务器
handler = FileHandler
host = "0.0.0.0"
with socketserver.TCPServer((host, port), handler) as httpd:
logging.info(f"Server started at http://{host}:{port}")
logging.info(f"Local access: http://localhost:{port}")
logging.info(f"Serving files from: {audio_dir}")
try:
httpd.serve_forever()
except KeyboardInterrupt:
logging.info("Server stopped by user")
httpd.server_close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Audio files HTTP server')
parser.add_argument('-p', '--port', type=int, default=8000, help='Port to run the server on (default: 8000)')
args = parser.parse_args()
run_server(args.port)