Python 调用海康 SDK
海康 SDK 下载:https://www.hikvision.com/cn/download_61.html
调用 SDK,及导包
import logging
import sys
import threading
import os
from ctypes import *
import datetime
import time
temp_dir = "temp_dir")
path = "HKLib/win64/"
so_list = []
# 获取所有的库文件到一个列表
def add_so(path, so_list):
    files = os.listdir(path)
    for file in files:
        if not os.path.isdir(path + file):
            if file.endswith(".dll"):
                so_list.append(path + file)
        else:
            add_so(path + file + "/", so_list)
add_so(path, so_list)
# lUserID = 0  # 用户ID具有唯一性,后续对设备的操作都需要通过此ID实现。如果只连接一个摄像头可以写这里
# m_lAlarmHandle = 0
# 执行 dll 方法
def callCpp(func_name, *args):
    for so_lib in so_list:
        try:
            lib = cdll.LoadLibrary(so_lib)
            try:
                value = eval("lib.%s" % func_name)(*args)
                logging.info("调用的库:" + so_lib)
                logging.info("执行成功,返回值:" + str(value))
                return value
            except:
                continue
        except:
            logging.info("库文件载入失败:" + so_lib)
            continue
    logging.error("没有找到接口!")
    return False定义结构体
# 设备参数结构体。
class LPNET_DVR_DEVICEINFO_V30(Structure):
    _fields_ = [
        ("sSerialNumber", c_byte * 48),
        ("byAlarmInPortNum", c_byte),
        ("byAlarmOutPortNum", c_byte),
        ("byDiskNum", c_byte),
        ("byDVRType", c_byte),
        ("byChanNum", c_byte),
        ("byStartChan", c_byte),
        ("byAudioChanNum", c_byte),
        ("byIPChanNum", c_byte),
        ("byZeroChanNum", c_byte),
        ("byMainProto", c_byte),
        ("bySubProto", c_byte),
        ("bySupport", c_byte),
        ("bySupport1", c_byte),
        ("bySupport2", c_byte),
        ("wDevType", c_uint16),
        ("bySupport3", c_byte),
        ("byMultiStreamProto", c_byte),
        ("byStartDChan", c_byte),
        ("byStartDTalkChan", c_byte),
        ("byHighDChanNum", c_byte),
        ("bySupport4", c_byte),
        ("byLanguageType", c_byte),
        ("byVoiceInChanNum", c_byte),
        ("byStartVoiceInChanNo", c_byte),
        ("byRes3", c_byte * 2),
        ("byMirrorChanNum", c_byte),
        ("wStartMirrorChanNo", c_uint16),
        ("byRes2", c_byte * 2)]
# 报警设备信息结构体
class NET_DVR_ALARMER(Structure):
    _fields_ = [
        ("byUserIDValid", c_byte),
        ("bySerialValid", c_byte),
        ("byVersionValid", c_byte),
        ("byDeviceNameValid", c_byte),
        ("byMacAddrValid", c_byte),
        ("byLinkPortValid", c_byte),
        ("byDeviceIPValid", c_byte),
        ("bySocketIPValid", c_byte),
        ("lUserID", c_long),
        ("sSerialNumber", c_byte * 48),
        ("dwDeviceVersion", c_uint32),
        ("sDeviceName", c_char * 32),
        ("byMacAddr", c_char * 6),
        ("wLinkPort", c_uint16),
        ("sDeviceIP", c_char * 128),
        ("sSocketIP", c_char * 128),
        ("byIpProtocol", c_byte),
        ("byRes2", c_byte * 11)
    ]
# 布防
class NET_DVR_SETUPALARM_PARAM(Structure):
    _fields_ = [
        ("dwSize", c_uint32),
        ("beLevel", c_byte),
        ("byAlarmInfoType", c_byte),
        ("byRetAlarmTypeV40", c_byte),
        ("byRetDevInfoVersion", c_byte),
        ("byRetVQDAlarmType", c_byte),
        ("byFaceAlarmDetection", c_byte),
        ("bySupport", c_byte),
        ("byBrokenNetHttp", c_byte),
        ("wTaskNo", c_uint16),
        ("byDeployType", c_byte),
        ("byRes1", c_byte * 3),
        ("byAlarmTypeURL", c_byte),
        ("byCustomCtrl", c_byte)
    ]
# 区域框参数结构体。
class NET_VCA_RECT(Structure):
    _fields_ = [
        ("fX", c_float),
        ("fY", c_float),
        ("fWidth", c_float),
        ("fHeight", c_float)
    ]
# 报警目标信息结构体。
class NET_VCA_TARGET_INFO(Structure):
    _fields_ = [
        ("dwID", c_uint32),
        ("struRect", NET_VCA_RECT),
        ("byRes", c_byte*4)
    ]
# IP地址结构体。
class NET_DVR_IPADDR(Structure):
    _fields_ = [
        ("sIpV4", c_char*16),
        ("sIpV6", c_byte*128)
    ]
# 前端设备信息结构体。
class NET_VCA_DEV_INFO(Structure):
    _fields_ = [
        ("struDevIP", NET_DVR_IPADDR),
        ("wPort", c_uint16),
        ("byChannel", c_byte),
        ("byIvmsChannel", c_byte)
    ]
# 人体属性参数结构体。
class NET_VCA_HUMAN_FEATURE(Structure):
    _fields_ = [
        ("byAgeGroup", c_byte),
        ("bySex", c_byte),
        ("byEyeGlass", c_byte),
        ("byAge", c_byte),
        ("byAgeDeviation", c_byte),
        ("byRes", c_byte*11)
    ]
# 人脸抓拍结果结构体。
class NET_VCA_FACESNAP_RESULT(Structure):
    _fields_ = [
        ("dwSize", c_uint32),
        ("dwRelativeTime", c_uint32),
        ("dwAbsTime", c_uint32),
        ("dwFacePicID", c_uint32),
        ("dwFaceScore", c_uint32),
        ("struTargetInfo", NET_VCA_TARGET_INFO),
        ("struRect", NET_VCA_RECT),
        ("struDevInfo", NET_VCA_DEV_INFO),
        ("dwFacePicLen", c_uint32),
        ("dwBackgroundPicLen", c_uint32),
        ("bySmart", c_byte),
        ("byAlarmEndMark", c_byte),
        ("byRepeatTimes", c_byte),
        ("byUploadEventDataType", c_byte),
        ("struFeature", NET_VCA_HUMAN_FEATURE),
        ("fStayDuration", c_float),
        ("sStorageIP", c_char*16),
        ("wStoragePort", c_uint16),
        ("wDevInfoIvmsChannelEx", c_uint16),
        ("byRes1", c_byte*15),
        ("byBrokenNetHttp", c_byte),
        ("pBuffer1", c_void_p),
        ("pBuffer2", c_void_p)
    ]SDK 初始化
def NET_DVR_Init():
    init_res = callCpp("NET_DVR_Init")  # SDK初始化
    if init_res:
        logging.info("SDK初始化成功")
        return True
    else:
        error_info = callCpp("NET_DVR_GetLastError")
        logging.error("SDK初始化错误:" + str(error_info))
        return False用户注册设备,设置超时时间
def NET_DVR_Login_V30(sDVRIP = "192.168.1.1",wDVRPort = 8000,sUserName = "admin",sPassword = "admin"):
    set_overtime = callCpp("NET_DVR_SetConnectTime", 5000, 4)  # 设置超时
    if set_overtime:
        logging.info(sDVRIP + ", 设置超时时间成功")
    else:
        error_info = callCpp("NET_DVR_GetLastError")
        logging.error(sDVRIP + ", 设置超时错误信息:" + str(error_info))
        return False
    # c++传递进去的是byte型数据,需要转成byte型传进去,否则会乱码
    sDVRIP_bytes = bytes(sDVRIP, "ascii")
    sUserName = bytes(sUserName, "ascii")
    sPassword = bytes(sPassword, "ascii")
    DeviceInfo = LPNET_DVR_DEVICEINFO_V30()
    DeviceInfoRef = byref(DeviceInfo)
    lUserID = callCpp("NET_DVR_Login_V30", sDVRIP_bytes, wDVRPort, sUserName, sPassword, DeviceInfoRef)
    logging.info(sDVRIP + ", 登录结果:" + str(lUserID))
    if lUserID == -1:  # -1表示失败,其他值表示返回的用户ID值。
        error_info = callCpp("NET_DVR_GetLastError")
        logging.error(sDVRIP + ", 登录错误信息:" + str(error_info))
    return lUserID设置报警回调函数
# 回调函数接口
def MsgCallback(lCommand, net_dvr_alarm, pAlarmInfo, dwBufLen, pUser):
    # logging.info("回调函数执行,lcommand的值:"+str(lCommand))
    if lCommand == 0x1112:  # 人脸抓拍结果,NET_VCA_FACESNAP_RESULT 人脸抓拍结果结构体。
        ip = str(net_dvr_alarm.contents.sDeviceIP, "utf-8")
        timestamp = time.time()
        datetime_struct = datetime.datetime.fromtimestamp(timestamp)
        format_time = datetime_struct.strftime('%Y%m%d%H%M%S%f')
        struFaceSnap = NET_VCA_FACESNAP_RESULT()
        memmove(pointer(struFaceSnap), pAlarmInfo, sizeof(struFaceSnap))
        face = (c_ubyte*struFaceSnap.dwFacePicLen)()
        memmove(face, struFaceSnap.pBuffer1, struFaceSnap.dwFacePicLen)
        # print(ip+"_" + format_time+", dwFacePicID:"+str(struFaceSnap.dwFacePicID))  # 人脸图ID
        # print(ip+"_" + format_time+", byAlarmEndMark:"+str(struFaceSnap.byAlarmEndMark))
        if struFaceSnap.byAlarmEndMark == 1:  # 报警结束标记:0- 保留,1- 结束标记(该字段结合人脸ID字段使用,表示该ID对应的下报警结束,用于判断报警结束,提取识别图片数据中,清晰度最高的图片)
            with open(os.path.join(temp_dir, ip + "_01_" + format_time + "_face.jpg"), "wb") as f:
                f.write(face)
            if struFaceSnap.pBuffer2:
                back = (c_ubyte*struFaceSnap.dwBackgroundPicLen)()
                memmove(back, struFaceSnap.pBuffer2, struFaceSnap.dwBackgroundPicLen)
                with open(os.path.join(temp_dir, ip + "_01_" + format_time + "_back.jpg"), "wb") as f:
                    f.write(back)
    # print("结构体中的变量,byUserIDValid输出:"+str(net_dvr_alarm.contents.byUserIDValid))
    return True
def NET_DVR_SetDVRMessageCallBack_V31(sDVRIP, callBack):
    if callCpp("NET_DVR_SetDVRMessageCallBack_V31", callBack, None):
        logging.info(sDVRIP + ", 设置回调函数成功!")
        return True
    else:
        logging.error(sDVRIP + ", 回调函数设置错误:" + str(callCpp("NET_DVR_GetLastError")))
        return False
    # iListenHandle = callCpp("NET_DVR_StartListen_V30", sLocalIP,wLocalPort,CALLFUNC(MSGCallBack),pUserData)
# 设置回调,
# MYCALLFUNC = CFUNCTYPE(c_bool, c_long, POINTER(NET_DVR_ALARMER),  c_void_p, c_uint32, c_void_p)
# callBack = MYCALLFUNC(MsgCallback)https://blog.csdn.net/sweibd/article/details/52679213
一个大坑:需要注意CMPRESULTFUNC(OnPyVideoAnalyzeResultCallback)这个指针函数是有自己的生存空间的,如果生存空间已过,会被释放,C 代码再回调的时候,就会使用一个过期指针。这里建议使用一个全局的 python 指针。
布防
def NET_DVR_SetupAlarmChan_V41(sDVRIP, lUserID):
    struAlarmParam = NET_DVR_SETUPALARM_PARAM()
    struAlarmParamSize = sizeof(struAlarmParam)
    logging.info(sDVRIP + ", 结构体大小:" + str(struAlarmParamSize))
    struAlarmParam.dwSize = struAlarmParamSize  # 结构体大小
    struAlarmParam.beLevel = 1  # 0 一级布防,1 二级布防
    # struAlarmParam.byAlarmInfoType = 1  # 智能交通设备有效
    struAlarmParam.byFaceAlarmDetection = 0  # 人脸侦测报警(人脸抓拍模块需要设置为 0)
    struAlarmParam_p = byref(struAlarmParam)
    m_lAlarmHandle = callCpp("NET_DVR_SetupAlarmChan_V41", lUserID, struAlarmParam_p)
    logging.info(sDVRIP + ", 布防结果:" + str(m_lAlarmHandle))
    if m_lAlarmHandle == -1:
        logging.error(sDVRIP + ", 错误信息:" + str(callCpp("NET_DVR_GetLastError")))
    else:
        logging.info(sDVRIP + ", 布防成功")
    return m_lAlarmHandle撤防,释放 SDK 资源,用户注销
# 撤防
def NET_DVR_CloseAlarmChan_V30(sDVRIP, m_lAlarmHandle):
    res = callCpp("NET_DVR_CloseAlarmChan_V30", m_lAlarmHandle)
    if res < 0:
        logging.error(sDVRIP + ", 撤防失败:" + str(callCpp("NET_DVR_GetLastError")))
        return False
    else:
        logging.info(sDVRIP + ", 撤防成功")
        return True
# 释放 SDK 资源
def NET_DVR_Cleanup():
    res = callCpp("NET_DVR_Cleanup")
    if res < 0:
        print("SDK资源释放失败:"+str(callCpp("NET_DVR_GetLastError")))
    else:
        print("SDK资源成功释放!!")
# 用户注销
def NET_DVR_Logout_V30(sDVRIP, lUserID):
    if callCpp("NET_DVR_Logout_V30", lUserID):
        logging.info(sDVRIP + ", 用户已经成功注销")
    else:
        logging.error(sDVRIP + ", 用户注销失败:" + str(callCpp("NET_DVR_GetLastError")))使用
from queue import Queue
picQueue = Queue()  # 阻塞线程用
class FACESNAP_RESULT_Thread (threading.Thread):
    def __init__(self, sDVRIP, wDVRPort, sUserName, sPassword):
        threading.Thread.__init__(self)
        self.sDVRIP = sDVRIP
        self.wDVRPort = wDVRPort
        self.sUserName = sUserName
        self.sPassword = sPassword
    def run(self):
        lUserID = NET_DVR_Login_V30(self.sDVRIP, self.wDVRPort, self.sUserName, self.sPassword)
        global threadsNum
        if lUserID != -1:
            # 设置回调
            MYCALLFUNC = CFUNCTYPE(c_bool, c_long, POINTER(NET_DVR_ALARMER),  c_void_p, c_uint32, c_void_p)
            callBack = MYCALLFUNC(MsgCallback)
            if NET_DVR_SetDVRMessageCallBack_V31(self.sDVRIP, callBack):
                m_lAlarmHandle = NET_DVR_SetupAlarmChan_V41(self.sDVRIP, lUserID)
                if m_lAlarmHandle != -1:
                    picQueue.get()
                    NET_DVR_CloseAlarmChan_V30(self.sDVRIP, m_lAlarmHandle)
                    NET_DVR_Logout_V30(self.sDVRIP, lUserID)
                    global lock
                    lock.acquire()
                    threadsNum -= 1
                    if threadsNum == 0:
                        picQueue.put(1)  # 主线程退出
                    else:
                        picQueue.put(2)  # 子线程退出
                    lock.release()
                    return
        threadsNum -= 1
        if threadsNum < 1:
            picQueue.put(1)
threadsNum = 0  # 相机数,子线程数
lock = threading.Lock()
def main():
    NET_DVR_Init()  # 初始化 SDK
    cameraTableValues=[["ip","port","username","password"], ["ip","port","username","password"]]
    threadsNum = len(cameraTableValues)
    threads = []  # 每个相机不同的线程
    for camera in cameraTableValues:
        t = FACESNAP_RESULT_Thread(camera[0], camera[1], camera[2], camera[3])
        threads.append(t)
    for th in threads:
        th.start()
    # 可以在回调函数中加入一定的逻辑,退出线程,picQueue.put(2)
    while 1:  # 阻塞线程,可以一直抓拍
        if picQueue.get() == 1:  # 代表子线程结束,可以退出了
            NET_DVR_Cleanup()
            break
        else:
            picQueue.put(2)
if __name__ == '__main__':
    main()抓图
# 抓图数据结构体
class NET_DVR_JPEGPARA(ctypes.Structure):
    _fields_ = [
        ("wPicSize", ctypes.c_ushort),
        ("wPicQuality", ctypes.c_ushort)]
# jpeg抓图hPlayWnd显示窗口能为none,存在缺点采集图片速度慢
def Get_JPEGpicture():
    sJpegPicFileName = bytes("pytest.jpg", "ascii")
    lpJpegPara=NET_DVR_JPEGPARA()
    lpJpegPara.wPicSize=0
    lpJpegPara.wPicQuality=0
    if (callCpp("NET_DVR_CaptureJPEGPicture", lUserID, lChannel, ctypes.byref(lpJpegPara), sJpegPicFileName)== False):
        error_info = callCpp("NET_DVR_GetLastError")
        print("抓图失败:" + str(error_info))
    else:
        print("抓图成功")总结
该程序使用的海康 SDK 版本为:CH-HCNetSDKV5.3.5.46_build20180518_Win64。主要是实现了【人脸抓拍模块流程】,获得人脸图及背景图。
参考
Windows64下通过python调用海康SDK实现登入、预览、抓图、光学变倍、相机激活、区域聚焦、区域曝光功能
python 调用海康SDK,登录,设置回调,布防,撤防,注销以及释放SDK
python如何调用C, 如何注册成C的回调函数(python后台程序常用方法)
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 bin07280@qq.com
文章标题:Python 调用海康 SDK
文章字数:2.2k
本文作者:Bin
发布时间:2018-09-26, 16:06:22
最后更新:2019-08-06, 00:07:35
原始链接:http://coolview.github.io/2018/09/26/Python/Python%20%E8%B0%83%E7%94%A8%E6%B5%B7%E5%BA%B7%20SDK/版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。