Python 调用海康 SDK

  1. 调用 SDK,及导包
  2. 定义结构体
  3. SDK 初始化
  4. 用户注册设备,设置超时时间
  5. 设置报警回调函数
  6. 布防
  7. 撤防,释放 SDK 资源,用户注销
  8. 使用
  9. 抓图
  10. 总结
  11. 参考

海康 SDK 下载:https://www.hikvision.com/cn/download_61.html

调用 SDK,及导包

import logging
import sys
import threading
import os
from ctypes import *
import datetime
import time

temp_dir = "temp_dir")

path = "HKLib/win64/"
so_list = []

# 获取所有的库文件到一个列表
def add_so(path, so_list):
    files = os.listdir(path)
    for file in files:
        if not os.path.isdir(path + file):
            if file.endswith(".dll"):
                so_list.append(path + file)
        else:
            add_so(path + file + "/", so_list)

add_so(path, so_list)

# lUserID = 0  # 用户ID具有唯一性,后续对设备的操作都需要通过此ID实现。如果只连接一个摄像头可以写这里
# m_lAlarmHandle = 0

# 执行 dll 方法
def callCpp(func_name, *args):
    for so_lib in so_list:
        try:
            lib = cdll.LoadLibrary(so_lib)
            try:
                value = eval("lib.%s" % func_name)(*args)
                logging.info("调用的库:" + so_lib)
                logging.info("执行成功,返回值:" + str(value))
                return value
            except:
                continue
        except:
            logging.info("库文件载入失败:" + so_lib)
            continue
    logging.error("没有找到接口!")
    return False

定义结构体

# 设备参数结构体。
class LPNET_DVR_DEVICEINFO_V30(Structure):
    _fields_ = [
        ("sSerialNumber", c_byte * 48),
        ("byAlarmInPortNum", c_byte),
        ("byAlarmOutPortNum", c_byte),
        ("byDiskNum", c_byte),
        ("byDVRType", c_byte),
        ("byChanNum", c_byte),
        ("byStartChan", c_byte),
        ("byAudioChanNum", c_byte),
        ("byIPChanNum", c_byte),
        ("byZeroChanNum", c_byte),
        ("byMainProto", c_byte),
        ("bySubProto", c_byte),
        ("bySupport", c_byte),
        ("bySupport1", c_byte),
        ("bySupport2", c_byte),
        ("wDevType", c_uint16),
        ("bySupport3", c_byte),
        ("byMultiStreamProto", c_byte),
        ("byStartDChan", c_byte),
        ("byStartDTalkChan", c_byte),
        ("byHighDChanNum", c_byte),
        ("bySupport4", c_byte),
        ("byLanguageType", c_byte),
        ("byVoiceInChanNum", c_byte),
        ("byStartVoiceInChanNo", c_byte),
        ("byRes3", c_byte * 2),
        ("byMirrorChanNum", c_byte),
        ("wStartMirrorChanNo", c_uint16),
        ("byRes2", c_byte * 2)]


# 报警设备信息结构体
class NET_DVR_ALARMER(Structure):
    _fields_ = [
        ("byUserIDValid", c_byte),
        ("bySerialValid", c_byte),
        ("byVersionValid", c_byte),
        ("byDeviceNameValid", c_byte),
        ("byMacAddrValid", c_byte),
        ("byLinkPortValid", c_byte),
        ("byDeviceIPValid", c_byte),
        ("bySocketIPValid", c_byte),
        ("lUserID", c_long),
        ("sSerialNumber", c_byte * 48),
        ("dwDeviceVersion", c_uint32),
        ("sDeviceName", c_char * 32),
        ("byMacAddr", c_char * 6),
        ("wLinkPort", c_uint16),
        ("sDeviceIP", c_char * 128),
        ("sSocketIP", c_char * 128),
        ("byIpProtocol", c_byte),
        ("byRes2", c_byte * 11)
    ]


# 布防
class NET_DVR_SETUPALARM_PARAM(Structure):
    _fields_ = [
        ("dwSize", c_uint32),
        ("beLevel", c_byte),
        ("byAlarmInfoType", c_byte),
        ("byRetAlarmTypeV40", c_byte),
        ("byRetDevInfoVersion", c_byte),
        ("byRetVQDAlarmType", c_byte),
        ("byFaceAlarmDetection", c_byte),
        ("bySupport", c_byte),
        ("byBrokenNetHttp", c_byte),
        ("wTaskNo", c_uint16),
        ("byDeployType", c_byte),
        ("byRes1", c_byte * 3),
        ("byAlarmTypeURL", c_byte),
        ("byCustomCtrl", c_byte)
    ]


# 区域框参数结构体。
class NET_VCA_RECT(Structure):
    _fields_ = [
        ("fX", c_float),
        ("fY", c_float),
        ("fWidth", c_float),
        ("fHeight", c_float)
    ]


# 报警目标信息结构体。
class NET_VCA_TARGET_INFO(Structure):
    _fields_ = [
        ("dwID", c_uint32),
        ("struRect", NET_VCA_RECT),
        ("byRes", c_byte*4)
    ]


# IP地址结构体。
class NET_DVR_IPADDR(Structure):
    _fields_ = [
        ("sIpV4", c_char*16),
        ("sIpV6", c_byte*128)
    ]

# 前端设备信息结构体。
class NET_VCA_DEV_INFO(Structure):
    _fields_ = [
        ("struDevIP", NET_DVR_IPADDR),
        ("wPort", c_uint16),
        ("byChannel", c_byte),
        ("byIvmsChannel", c_byte)
    ]


# 人体属性参数结构体。
class NET_VCA_HUMAN_FEATURE(Structure):
    _fields_ = [
        ("byAgeGroup", c_byte),
        ("bySex", c_byte),
        ("byEyeGlass", c_byte),
        ("byAge", c_byte),
        ("byAgeDeviation", c_byte),
        ("byRes", c_byte*11)
    ]


# 人脸抓拍结果结构体。
class NET_VCA_FACESNAP_RESULT(Structure):
    _fields_ = [
        ("dwSize", c_uint32),
        ("dwRelativeTime", c_uint32),
        ("dwAbsTime", c_uint32),
        ("dwFacePicID", c_uint32),
        ("dwFaceScore", c_uint32),
        ("struTargetInfo", NET_VCA_TARGET_INFO),
        ("struRect", NET_VCA_RECT),
        ("struDevInfo", NET_VCA_DEV_INFO),
        ("dwFacePicLen", c_uint32),
        ("dwBackgroundPicLen", c_uint32),
        ("bySmart", c_byte),
        ("byAlarmEndMark", c_byte),
        ("byRepeatTimes", c_byte),
        ("byUploadEventDataType", c_byte),
        ("struFeature", NET_VCA_HUMAN_FEATURE),
        ("fStayDuration", c_float),
        ("sStorageIP", c_char*16),
        ("wStoragePort", c_uint16),
        ("wDevInfoIvmsChannelEx", c_uint16),
        ("byRes1", c_byte*15),
        ("byBrokenNetHttp", c_byte),
        ("pBuffer1", c_void_p),
        ("pBuffer2", c_void_p)
    ]

SDK 初始化

def NET_DVR_Init():
    init_res = callCpp("NET_DVR_Init")  # SDK初始化
    if init_res:
        logging.info("SDK初始化成功")
        return True
    else:
        error_info = callCpp("NET_DVR_GetLastError")
        logging.error("SDK初始化错误:" + str(error_info))
        return False

用户注册设备,设置超时时间

def NET_DVR_Login_V30(sDVRIP = "192.168.1.1",wDVRPort = 8000,sUserName = "admin",sPassword = "admin"):

    set_overtime = callCpp("NET_DVR_SetConnectTime", 5000, 4)  # 设置超时
    if set_overtime:
        logging.info(sDVRIP + ", 设置超时时间成功")
    else:
        error_info = callCpp("NET_DVR_GetLastError")
        logging.error(sDVRIP + ", 设置超时错误信息:" + str(error_info))
        return False

    # c++传递进去的是byte型数据,需要转成byte型传进去,否则会乱码
    sDVRIP_bytes = bytes(sDVRIP, "ascii")
    sUserName = bytes(sUserName, "ascii")
    sPassword = bytes(sPassword, "ascii")
    DeviceInfo = LPNET_DVR_DEVICEINFO_V30()
    DeviceInfoRef = byref(DeviceInfo)
    lUserID = callCpp("NET_DVR_Login_V30", sDVRIP_bytes, wDVRPort, sUserName, sPassword, DeviceInfoRef)
    logging.info(sDVRIP + ", 登录结果:" + str(lUserID))
    if lUserID == -1:  # -1表示失败,其他值表示返回的用户ID值。
        error_info = callCpp("NET_DVR_GetLastError")
        logging.error(sDVRIP + ", 登录错误信息:" + str(error_info))

    return lUserID

设置报警回调函数

# 回调函数接口
def MsgCallback(lCommand, net_dvr_alarm, pAlarmInfo, dwBufLen, pUser):
    # logging.info("回调函数执行,lcommand的值:"+str(lCommand))
    if lCommand == 0x1112:  # 人脸抓拍结果,NET_VCA_FACESNAP_RESULT 人脸抓拍结果结构体。

        ip = str(net_dvr_alarm.contents.sDeviceIP, "utf-8")
        timestamp = time.time()
        datetime_struct = datetime.datetime.fromtimestamp(timestamp)
        format_time = datetime_struct.strftime('%Y%m%d%H%M%S%f')

        struFaceSnap = NET_VCA_FACESNAP_RESULT()
        memmove(pointer(struFaceSnap), pAlarmInfo, sizeof(struFaceSnap))
        face = (c_ubyte*struFaceSnap.dwFacePicLen)()
        memmove(face, struFaceSnap.pBuffer1, struFaceSnap.dwFacePicLen)
        # print(ip+"_" + format_time+", dwFacePicID:"+str(struFaceSnap.dwFacePicID))  # 人脸图ID
        # print(ip+"_" + format_time+", byAlarmEndMark:"+str(struFaceSnap.byAlarmEndMark))
        if struFaceSnap.byAlarmEndMark == 1:  # 报警结束标记:0- 保留,1- 结束标记(该字段结合人脸ID字段使用,表示该ID对应的下报警结束,用于判断报警结束,提取识别图片数据中,清晰度最高的图片)
            with open(os.path.join(temp_dir, ip + "_01_" + format_time + "_face.jpg"), "wb") as f:
                f.write(face)

            if struFaceSnap.pBuffer2:
                back = (c_ubyte*struFaceSnap.dwBackgroundPicLen)()
                memmove(back, struFaceSnap.pBuffer2, struFaceSnap.dwBackgroundPicLen)
                with open(os.path.join(temp_dir, ip + "_01_" + format_time + "_back.jpg"), "wb") as f:
                    f.write(back)

    # print("结构体中的变量,byUserIDValid输出:"+str(net_dvr_alarm.contents.byUserIDValid))
    return True


def NET_DVR_SetDVRMessageCallBack_V31(sDVRIP, callBack):
    if callCpp("NET_DVR_SetDVRMessageCallBack_V31", callBack, None):
        logging.info(sDVRIP + ", 设置回调函数成功!")
        return True
    else:
        logging.error(sDVRIP + ", 回调函数设置错误:" + str(callCpp("NET_DVR_GetLastError")))
        return False
    # iListenHandle = callCpp("NET_DVR_StartListen_V30", sLocalIP,wLocalPort,CALLFUNC(MSGCallBack),pUserData)


# 设置回调,
# MYCALLFUNC = CFUNCTYPE(c_bool, c_long, POINTER(NET_DVR_ALARMER),  c_void_p, c_uint32, c_void_p)
# callBack = MYCALLFUNC(MsgCallback)

https://blog.csdn.net/sweibd/article/details/52679213
一个大坑:需要注意 CMPRESULTFUNC(OnPyVideoAnalyzeResultCallback) 这个指针函数是有自己的生存空间的,如果生存空间已过,会被释放,C 代码再回调的时候,就会使用一个过期指针。

这里建议使用一个全局的 python 指针。

布防

def NET_DVR_SetupAlarmChan_V41(sDVRIP, lUserID):
    struAlarmParam = NET_DVR_SETUPALARM_PARAM()
    struAlarmParamSize = sizeof(struAlarmParam)
    logging.info(sDVRIP + ", 结构体大小:" + str(struAlarmParamSize))
    struAlarmParam.dwSize = struAlarmParamSize  # 结构体大小
    struAlarmParam.beLevel = 1  # 0 一级布防,1 二级布防
    # struAlarmParam.byAlarmInfoType = 1  # 智能交通设备有效
    struAlarmParam.byFaceAlarmDetection = 0  # 人脸侦测报警(人脸抓拍模块需要设置为 0)
    struAlarmParam_p = byref(struAlarmParam)
    m_lAlarmHandle = callCpp("NET_DVR_SetupAlarmChan_V41", lUserID, struAlarmParam_p)
    logging.info(sDVRIP + ", 布防结果:" + str(m_lAlarmHandle))
    if m_lAlarmHandle == -1:
        logging.error(sDVRIP + ", 错误信息:" + str(callCpp("NET_DVR_GetLastError")))
    else:
        logging.info(sDVRIP + ", 布防成功")

    return m_lAlarmHandle

撤防,释放 SDK 资源,用户注销

# 撤防
def NET_DVR_CloseAlarmChan_V30(sDVRIP, m_lAlarmHandle):
    res = callCpp("NET_DVR_CloseAlarmChan_V30", m_lAlarmHandle)
    if res < 0:
        logging.error(sDVRIP + ", 撤防失败:" + str(callCpp("NET_DVR_GetLastError")))
        return False
    else:
        logging.info(sDVRIP + ", 撤防成功")
        return True


# 释放 SDK 资源
def NET_DVR_Cleanup():
    res = callCpp("NET_DVR_Cleanup")
    if res < 0:
        print("SDK资源释放失败:"+str(callCpp("NET_DVR_GetLastError")))
    else:
        print("SDK资源成功释放!!")


# 用户注销
def NET_DVR_Logout_V30(sDVRIP, lUserID):
    if callCpp("NET_DVR_Logout_V30", lUserID):
        logging.info(sDVRIP + ", 用户已经成功注销")
    else:
        logging.error(sDVRIP + ", 用户注销失败:" + str(callCpp("NET_DVR_GetLastError")))

使用

from queue import Queue

picQueue = Queue()  # 阻塞线程用

class FACESNAP_RESULT_Thread (threading.Thread):
    def __init__(self, sDVRIP, wDVRPort, sUserName, sPassword):
        threading.Thread.__init__(self)
        self.sDVRIP = sDVRIP
        self.wDVRPort = wDVRPort
        self.sUserName = sUserName
        self.sPassword = sPassword

    def run(self):
        lUserID = NET_DVR_Login_V30(self.sDVRIP, self.wDVRPort, self.sUserName, self.sPassword)
        global threadsNum
        if lUserID != -1:
            # 设置回调
            MYCALLFUNC = CFUNCTYPE(c_bool, c_long, POINTER(NET_DVR_ALARMER),  c_void_p, c_uint32, c_void_p)
            callBack = MYCALLFUNC(MsgCallback)
            if NET_DVR_SetDVRMessageCallBack_V31(self.sDVRIP, callBack):
                m_lAlarmHandle = NET_DVR_SetupAlarmChan_V41(self.sDVRIP, lUserID)
                if m_lAlarmHandle != -1:
                    picQueue.get()

                    NET_DVR_CloseAlarmChan_V30(self.sDVRIP, m_lAlarmHandle)
                    NET_DVR_Logout_V30(self.sDVRIP, lUserID)

                    global lock
                    lock.acquire()
                    threadsNum -= 1
                    if threadsNum == 0:
                        picQueue.put(1)  # 主线程退出
                    else:
                        picQueue.put(2)  # 子线程退出
                    lock.release()
                    return

        threadsNum -= 1
        if threadsNum < 1:
            picQueue.put(1)


threadsNum = 0  # 相机数,子线程数
lock = threading.Lock()
def main():
    NET_DVR_Init()  # 初始化 SDK
    cameraTableValues=[["ip","port","username","password"], ["ip","port","username","password"]]
    threadsNum = len(cameraTableValues)
    threads = []  # 每个相机不同的线程
    for camera in cameraTableValues:
        t = FACESNAP_RESULT_Thread(camera[0], camera[1], camera[2], camera[3])
        threads.append(t)

    for th in threads:
        th.start()

    # 可以在回调函数中加入一定的逻辑,退出线程,picQueue.put(2)
    while 1:  # 阻塞线程,可以一直抓拍
        if picQueue.get() == 1:  # 代表子线程结束,可以退出了
            NET_DVR_Cleanup()
            break
        else:
            picQueue.put(2)


if __name__ == '__main__':
    main()

抓图

# 抓图数据结构体
class NET_DVR_JPEGPARA(ctypes.Structure):
    _fields_ = [
        ("wPicSize", ctypes.c_ushort),
        ("wPicQuality", ctypes.c_ushort)]

# jpeg抓图hPlayWnd显示窗口能为none,存在缺点采集图片速度慢
def Get_JPEGpicture():
    sJpegPicFileName = bytes("pytest.jpg", "ascii")
    lpJpegPara=NET_DVR_JPEGPARA()
    lpJpegPara.wPicSize=0
    lpJpegPara.wPicQuality=0
    if (callCpp("NET_DVR_CaptureJPEGPicture", lUserID, lChannel, ctypes.byref(lpJpegPara), sJpegPicFileName)== False):
        error_info = callCpp("NET_DVR_GetLastError")
        print("抓图失败:" + str(error_info))
    else:
        print("抓图成功")

总结

该程序使用的海康 SDK 版本为:CH-HCNetSDKV5.3.5.46_build20180518_Win64。主要是实现了【人脸抓拍模块流程】,获得人脸图及背景图。

参考

Windows64下通过python调用海康SDK实现登入、预览、抓图、光学变倍、相机激活、区域聚焦、区域曝光功能
python 调用海康SDK,登录,设置回调,布防,撤防,注销以及释放SDK
python如何调用C, 如何注册成C的回调函数(python后台程序常用方法)


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 bin07280@qq.com

文章标题:Python 调用海康 SDK

文章字数:2.2k

本文作者:Bin

发布时间:2018-09-26, 16:06:22

最后更新:2019-08-06, 00:07:35

原始链接:http://coolview.github.io/2018/09/26/Python/Python%20%E8%B0%83%E7%94%A8%E6%B5%B7%E5%BA%B7%20SDK/

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录