首页 » 信息技术 »

Python实现异步多线程TCP服务器

2019年4月4日 / 31次阅读
Python

  • 打开支付宝,搜索“ 529018372 ”,领取专属红包!每日支付每日领。

最近用Python写了一个异步多线程TCP服务器框架,用于公司项目消息收发的测试验证。供参考。

下面的代码,可以在局域网类支持多个客户端的连接。唯一的一个问题是,除了通过判断接收数据长度为0之外,还有什么别的更好的方法判断TCP的连接已经中断了?

server.daemon_threads = True,表示可以通过Ctrl+C来关闭服务器进程以及其派生的所有连接线程,这个值默认是False。

#!/usr/bin/env python3
import socket
import socketserver
import time
import datetime
import random


TIMEOUT = 5
SUCCNOLEN = b'\x80\x00'


def printhex(bstr):
    """print byte string in hex format"""
    for i in range(len(bstr)):
        print('0x%02X'%bstr[i], end=' ')


class MyTCPHandler(socketserver.BaseRequestHandler):
    """
    The request handler class for our server.

    It is instantiated once per connection to the server, and must
    override the handle() method to implement communication to the
    client.
    """
    
    def handle(self):

        def _reply(bstr, show_msg):
            """
            bstr, byte string about to reply
            show_msg, message string about to print """
            # ramdom delay 0 to 1 second
            time.sleep(random.random())
            print('--> ['\
                      +datetime.datetime.now().strftime('%H:%M:%S.%f')+']',
                      end= ' ')
            print(f'reply to {client_ip}:{client_port} with '\
                      +str(len(bstr))+' bytes:', end=' ')
            printhex(bstr)
            print()
            print('# reply msg:', show_msg)
            try:
                self.request.sendall(bstr)
            except:
                print(f'* sending exception with {client_ip}:{client_port},'
                      f' connection thread may be down already')

        client_ip = self.client_address[0]
        client_port = self.client_address[1]
        print(f'* new connection is established from {client_ip}:{client_port}')

        while True:
            try:
                self.data = self.request.recv(1024).strip()
            except:
                print(f'* receiving exception with {client_ip}:{client_port},'
                      f' connection thread may be down already')
                return

            if len(self.data) != 0:
                no_data_time = 0
                print('--> ['+datetime.datetime.now().strftime('%H:%M:%S.%f')+']',
                      end= ' ')
                print(f'from {client_ip}:{client_port} received', end=' ')
                print(str(len(self.data)) + ' bytes:', end=' ')
                printhex(self.data)
                print()
                
                # 001 set 10G BE mode
                if self.data == b'\xF0\x03\x17\x42\x00':
                    print('# received msg: set 10G BE mode')
                    _reply(SUCCNOLEN, 'ok')
             
                # 002 read 10G BE mode
                if self.data == b'\xF0\x01\x18':
                    print('# receive msg: read 10G BE mode')
                    _reply(b'\x80\x02\x42\x00', 'ok')

                # 003 write 10G SFP SR equilibrium value (eq),
                # the second 0x0D means write it for ALL slots automatically
                # after reentry the BE mode for slots (not reboot board),
                # if uses 0x0C, the eq value will set to default for all slots.
                if self.data == b'\xF1\x06\x0D\x0D\x00\x54\x81\x40':
                    print('# receive msg: write 10G SFP SR eq value')
                    _reply(SUCCNOLEN, 'ok')

                # 003 write 10G SFP LR equilibrium value (eq),
                # the second 0x0D means write it for ALL slots automatically
                # after reentry the BE mode for slots (not reboot board),
                # if uses 0x0C, the eq value will set to default for all slots.
                if self.data == b'\xF1\x06\x0D\x0D\x00\x54\x11\x40':
                    print('# receive msg: write 10G SFP LR eq value')
                    _reply(SUCCNOLEN, 'ok')

                # 005 in 10G BE mode, inquire modules info
                if self.data == b'\xF0\x03\x30\x00\x00':
                    print('# receive msg: in 10G BE mode, inquire modules info')
                    # here should reply 52 bytes
                    bstr = b'\x80\x32'
                    for i in range(50):
                        j = random.randint(0,2)
                        if j == 0: bstr += b'\x03'
                        if j == 1: bstr += b'\x82'
                        if j == 2: bstr += b'\x1A'
                    _reply(bstr, 'ok, report modules info in random')


            else:
                no_data_time += 1
                if no_data_time > TIMEOUT:
                    print(f'* no message flying for {TIMEOUT}s, '
                           'connection thread shutdown for '
                          f'{client_ip}:{client_port}, but ATTS is still '
                           'running...')
                    return
                time.sleep(1)  # no data round, sleep 1 second

        return


if __name__ == "__main__":
    HOST = socket.gethostbyname(socket.gethostname())
    PORT = 56789

    with socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler) as server:
        # Activate the server; this will keep running until you
        # interrupt the program with Ctrl-C
        server.daemon_threads = True  # make Ctrl+C works
        print(f'* ATTS (ATCA Testing Tcp Server) has started at {HOST}:{PORT}')
        server.serve_forever()
        

本文链接:https://www.maixj.net/ict/python-tcp-20859

相关文章

留言区

《Python实现异步多线程TCP服务器》有3条留言

  • 麦新杰

    使用官方的logging模块,这个模块宣称是thread safety的 []

  • 麦新杰

    print不是thread safety,要使用sys.stdout.write来代替,原因不详,可能是因为后者在Python内部就是一个原子操作吧。 []

    • 麦新杰

      网上看到的信息,都是针对python2的,不清楚python3是否也是这样。 []


前一篇:
后一篇:
推一篇:可靠正规,长期稳定,网络兼职项目!!

栏目精选

云上小悟,麦新杰的独立博客

Ctrl+D 收藏本页

栏目


©Copyright 麦新杰 Since 2014 云上小悟独立博客版权所有 备案号:苏ICP备14045477号-1。云上小悟网站部分内容来源于网络,转载目的是为了整合信息,收藏学习,服务大家,有些转载内容也难以判断是否有侵权问题,如果侵犯了您的权益,请及时联系站长,我会立即删除。

网站二维码
go to top