Python 调用海康 SDK
海康 SDK 下载:https://www.hikvision.com/cn/download_61.html
调用 SDK,及导包
import logging
import sys
import threading
import os
from ctypes import *
import datetime
import time
temp_dir = "temp_dir")
path = "HKLib/win64/"
so_list = []
# 获取所有的库文件到一个列表
def add_so(path, so_list):
files = os.listdir(path)
for file in files:
if not os.path.isdir(path + file):
if file.endswith(".dll"):
so_list.append(path + file)
else:
add_so(path + file + "/", so_list)
add_so(path, so_list)
# lUserID = 0 # 用户ID具有唯一性,后续对设备的操作都需要通过此ID实现。如果只连接一个摄像头可以写这里
# m_lAlarmHandle = 0
# 执行 dll 方法
def callCpp(func_name, *args):
for so_lib in so_list:
try:
lib = cdll.LoadLibrary(so_lib)
try:
value = eval("lib.%s" % func_name)(*args)
logging.info("调用的库:" + so_lib)
logging.info("执行成功,返回值:" + str(value))
return value
except:
continue
except:
logging.info("库文件载入失败:" + so_lib)
continue
logging.error("没有找到接口!")
return False
定义结构体
# 设备参数结构体。
class LPNET_DVR_DEVICEINFO_V30(Structure):
_fields_ = [
("sSerialNumber", c_byte * 48),
("byAlarmInPortNum", c_byte),
("byAlarmOutPortNum", c_byte),
("byDiskNum", c_byte),
("byDVRType", c_byte),
("byChanNum", c_byte),
("byStartChan", c_byte),
("byAudioChanNum", c_byte),
("byIPChanNum", c_byte),
("byZeroChanNum", c_byte),
("byMainProto", c_byte),
("bySubProto", c_byte),
("bySupport", c_byte),
("bySupport1", c_byte),
("bySupport2", c_byte),
("wDevType", c_uint16),
("bySupport3", c_byte),
("byMultiStreamProto", c_byte),
("byStartDChan", c_byte),
("byStartDTalkChan", c_byte),
("byHighDChanNum", c_byte),
("bySupport4", c_byte),
("byLanguageType", c_byte),
("byVoiceInChanNum", c_byte),
("byStartVoiceInChanNo", c_byte),
("byRes3", c_byte * 2),
("byMirrorChanNum", c_byte),
("wStartMirrorChanNo", c_uint16),
("byRes2", c_byte * 2)]
# 报警设备信息结构体
class NET_DVR_ALARMER(Structure):
_fields_ = [
("byUserIDValid", c_byte),
("bySerialValid", c_byte),
("byVersionValid", c_byte),
("byDeviceNameValid", c_byte),
("byMacAddrValid", c_byte),
("byLinkPortValid", c_byte),
("byDeviceIPValid", c_byte),
("bySocketIPValid", c_byte),
("lUserID", c_long),
("sSerialNumber", c_byte * 48),
("dwDeviceVersion", c_uint32),
("sDeviceName", c_char * 32),
("byMacAddr", c_char * 6),
("wLinkPort", c_uint16),
("sDeviceIP", c_char * 128),
("sSocketIP", c_char * 128),
("byIpProtocol", c_byte),
("byRes2", c_byte * 11)
]
# 布防
class NET_DVR_SETUPALARM_PARAM(Structure):
_fields_ = [
("dwSize", c_uint32),
("beLevel", c_byte),
("byAlarmInfoType", c_byte),
("byRetAlarmTypeV40", c_byte),
("byRetDevInfoVersion", c_byte),
("byRetVQDAlarmType", c_byte),
("byFaceAlarmDetection", c_byte),
("bySupport", c_byte),
("byBrokenNetHttp", c_byte),
("wTaskNo", c_uint16),
("byDeployType", c_byte),
("byRes1", c_byte * 3),
("byAlarmTypeURL", c_byte),
("byCustomCtrl", c_byte)
]
# 区域框参数结构体。
class NET_VCA_RECT(Structure):
_fields_ = [
("fX", c_float),
("fY", c_float),
("fWidth", c_float),
("fHeight", c_float)
]
# 报警目标信息结构体。
class NET_VCA_TARGET_INFO(Structure):
_fields_ = [
("dwID", c_uint32),
("struRect", NET_VCA_RECT),
("byRes", c_byte*4)
]
# IP地址结构体。
class NET_DVR_IPADDR(Structure):
_fields_ = [
("sIpV4", c_char*16),
("sIpV6", c_byte*128)
]
# 前端设备信息结构体。
class NET_VCA_DEV_INFO(Structure):
_fields_ = [
("struDevIP", NET_DVR_IPADDR),
("wPort", c_uint16),
("byChannel", c_byte),
("byIvmsChannel", c_byte)
]
# 人体属性参数结构体。
class NET_VCA_HUMAN_FEATURE(Structure):
_fields_ = [
("byAgeGroup", c_byte),
("bySex", c_byte),
("byEyeGlass", c_byte),
("byAge", c_byte),
("byAgeDeviation", c_byte),
("byRes", c_byte*11)
]
# 人脸抓拍结果结构体。
class NET_VCA_FACESNAP_RESULT(Structure):
_fields_ = [
("dwSize", c_uint32),
("dwRelativeTime", c_uint32),
("dwAbsTime", c_uint32),
("dwFacePicID", c_uint32),
("dwFaceScore", c_uint32),
("struTargetInfo", NET_VCA_TARGET_INFO),
("struRect", NET_VCA_RECT),
("struDevInfo", NET_VCA_DEV_INFO),
("dwFacePicLen", c_uint32),
("dwBackgroundPicLen", c_uint32),
("bySmart", c_byte),
("byAlarmEndMark", c_byte),
("byRepeatTimes", c_byte),
("byUploadEventDataType", c_byte),
("struFeature", NET_VCA_HUMAN_FEATURE),
("fStayDuration", c_float),
("sStorageIP", c_char*16),
("wStoragePort", c_uint16),
("wDevInfoIvmsChannelEx", c_uint16),
("byRes1", c_byte*15),
("byBrokenNetHttp", c_byte),
("pBuffer1", c_void_p),
("pBuffer2", c_void_p)
]
SDK 初始化
def NET_DVR_Init():
init_res = callCpp("NET_DVR_Init") # SDK初始化
if init_res:
logging.info("SDK初始化成功")
return True
else:
error_info = callCpp("NET_DVR_GetLastError")
logging.error("SDK初始化错误:" + str(error_info))
return False
用户注册设备,设置超时时间
def NET_DVR_Login_V30(sDVRIP = "192.168.1.1",wDVRPort = 8000,sUserName = "admin",sPassword = "admin"):
set_overtime = callCpp("NET_DVR_SetConnectTime", 5000, 4) # 设置超时
if set_overtime:
logging.info(sDVRIP + ", 设置超时时间成功")
else:
error_info = callCpp("NET_DVR_GetLastError")
logging.error(sDVRIP + ", 设置超时错误信息:" + str(error_info))
return False
# c++传递进去的是byte型数据,需要转成byte型传进去,否则会乱码
sDVRIP_bytes = bytes(sDVRIP, "ascii")
sUserName = bytes(sUserName, "ascii")
sPassword = bytes(sPassword, "ascii")
DeviceInfo = LPNET_DVR_DEVICEINFO_V30()
DeviceInfoRef = byref(DeviceInfo)
lUserID = callCpp("NET_DVR_Login_V30", sDVRIP_bytes, wDVRPort, sUserName, sPassword, DeviceInfoRef)
logging.info(sDVRIP + ", 登录结果:" + str(lUserID))
if lUserID == -1: # -1表示失败,其他值表示返回的用户ID值。
error_info = callCpp("NET_DVR_GetLastError")
logging.error(sDVRIP + ", 登录错误信息:" + str(error_info))
return lUserID
设置报警回调函数
# 回调函数接口
def MsgCallback(lCommand, net_dvr_alarm, pAlarmInfo, dwBufLen, pUser):
# logging.info("回调函数执行,lcommand的值:"+str(lCommand))
if lCommand == 0x1112: # 人脸抓拍结果,NET_VCA_FACESNAP_RESULT 人脸抓拍结果结构体。
ip = str(net_dvr_alarm.contents.sDeviceIP, "utf-8")
timestamp = time.time()
datetime_struct = datetime.datetime.fromtimestamp(timestamp)
format_time = datetime_struct.strftime('%Y%m%d%H%M%S%f')
struFaceSnap = NET_VCA_FACESNAP_RESULT()
memmove(pointer(struFaceSnap), pAlarmInfo, sizeof(struFaceSnap))
face = (c_ubyte*struFaceSnap.dwFacePicLen)()
memmove(face, struFaceSnap.pBuffer1, struFaceSnap.dwFacePicLen)
# print(ip+"_" + format_time+", dwFacePicID:"+str(struFaceSnap.dwFacePicID)) # 人脸图ID
# print(ip+"_" + format_time+", byAlarmEndMark:"+str(struFaceSnap.byAlarmEndMark))
if struFaceSnap.byAlarmEndMark == 1: # 报警结束标记:0- 保留,1- 结束标记(该字段结合人脸ID字段使用,表示该ID对应的下报警结束,用于判断报警结束,提取识别图片数据中,清晰度最高的图片)
with open(os.path.join(temp_dir, ip + "_01_" + format_time + "_face.jpg"), "wb") as f:
f.write(face)
if struFaceSnap.pBuffer2:
back = (c_ubyte*struFaceSnap.dwBackgroundPicLen)()
memmove(back, struFaceSnap.pBuffer2, struFaceSnap.dwBackgroundPicLen)
with open(os.path.join(temp_dir, ip + "_01_" + format_time + "_back.jpg"), "wb") as f:
f.write(back)
# print("结构体中的变量,byUserIDValid输出:"+str(net_dvr_alarm.contents.byUserIDValid))
return True
def NET_DVR_SetDVRMessageCallBack_V31(sDVRIP, callBack):
if callCpp("NET_DVR_SetDVRMessageCallBack_V31", callBack, None):
logging.info(sDVRIP + ", 设置回调函数成功!")
return True
else:
logging.error(sDVRIP + ", 回调函数设置错误:" + str(callCpp("NET_DVR_GetLastError")))
return False
# iListenHandle = callCpp("NET_DVR_StartListen_V30", sLocalIP,wLocalPort,CALLFUNC(MSGCallBack),pUserData)
# 设置回调,
# MYCALLFUNC = CFUNCTYPE(c_bool, c_long, POINTER(NET_DVR_ALARMER), c_void_p, c_uint32, c_void_p)
# callBack = MYCALLFUNC(MsgCallback)
https://blog.csdn.net/sweibd/article/details/52679213
一个大坑:需要注意CMPRESULTFUNC(OnPyVideoAnalyzeResultCallback)
这个指针函数是有自己的生存空间的,如果生存空间已过,会被释放,C 代码再回调的时候,就会使用一个过期指针。这里建议使用一个全局的 python 指针。
布防
def NET_DVR_SetupAlarmChan_V41(sDVRIP, lUserID):
struAlarmParam = NET_DVR_SETUPALARM_PARAM()
struAlarmParamSize = sizeof(struAlarmParam)
logging.info(sDVRIP + ", 结构体大小:" + str(struAlarmParamSize))
struAlarmParam.dwSize = struAlarmParamSize # 结构体大小
struAlarmParam.beLevel = 1 # 0 一级布防,1 二级布防
# struAlarmParam.byAlarmInfoType = 1 # 智能交通设备有效
struAlarmParam.byFaceAlarmDetection = 0 # 人脸侦测报警(人脸抓拍模块需要设置为 0)
struAlarmParam_p = byref(struAlarmParam)
m_lAlarmHandle = callCpp("NET_DVR_SetupAlarmChan_V41", lUserID, struAlarmParam_p)
logging.info(sDVRIP + ", 布防结果:" + str(m_lAlarmHandle))
if m_lAlarmHandle == -1:
logging.error(sDVRIP + ", 错误信息:" + str(callCpp("NET_DVR_GetLastError")))
else:
logging.info(sDVRIP + ", 布防成功")
return m_lAlarmHandle
撤防,释放 SDK 资源,用户注销
# 撤防
def NET_DVR_CloseAlarmChan_V30(sDVRIP, m_lAlarmHandle):
res = callCpp("NET_DVR_CloseAlarmChan_V30", m_lAlarmHandle)
if res < 0:
logging.error(sDVRIP + ", 撤防失败:" + str(callCpp("NET_DVR_GetLastError")))
return False
else:
logging.info(sDVRIP + ", 撤防成功")
return True
# 释放 SDK 资源
def NET_DVR_Cleanup():
res = callCpp("NET_DVR_Cleanup")
if res < 0:
print("SDK资源释放失败:"+str(callCpp("NET_DVR_GetLastError")))
else:
print("SDK资源成功释放!!")
# 用户注销
def NET_DVR_Logout_V30(sDVRIP, lUserID):
if callCpp("NET_DVR_Logout_V30", lUserID):
logging.info(sDVRIP + ", 用户已经成功注销")
else:
logging.error(sDVRIP + ", 用户注销失败:" + str(callCpp("NET_DVR_GetLastError")))
使用
from queue import Queue
picQueue = Queue() # 阻塞线程用
class FACESNAP_RESULT_Thread (threading.Thread):
def __init__(self, sDVRIP, wDVRPort, sUserName, sPassword):
threading.Thread.__init__(self)
self.sDVRIP = sDVRIP
self.wDVRPort = wDVRPort
self.sUserName = sUserName
self.sPassword = sPassword
def run(self):
lUserID = NET_DVR_Login_V30(self.sDVRIP, self.wDVRPort, self.sUserName, self.sPassword)
global threadsNum
if lUserID != -1:
# 设置回调
MYCALLFUNC = CFUNCTYPE(c_bool, c_long, POINTER(NET_DVR_ALARMER), c_void_p, c_uint32, c_void_p)
callBack = MYCALLFUNC(MsgCallback)
if NET_DVR_SetDVRMessageCallBack_V31(self.sDVRIP, callBack):
m_lAlarmHandle = NET_DVR_SetupAlarmChan_V41(self.sDVRIP, lUserID)
if m_lAlarmHandle != -1:
picQueue.get()
NET_DVR_CloseAlarmChan_V30(self.sDVRIP, m_lAlarmHandle)
NET_DVR_Logout_V30(self.sDVRIP, lUserID)
global lock
lock.acquire()
threadsNum -= 1
if threadsNum == 0:
picQueue.put(1) # 主线程退出
else:
picQueue.put(2) # 子线程退出
lock.release()
return
threadsNum -= 1
if threadsNum < 1:
picQueue.put(1)
threadsNum = 0 # 相机数,子线程数
lock = threading.Lock()
def main():
NET_DVR_Init() # 初始化 SDK
cameraTableValues=[["ip","port","username","password"], ["ip","port","username","password"]]
threadsNum = len(cameraTableValues)
threads = [] # 每个相机不同的线程
for camera in cameraTableValues:
t = FACESNAP_RESULT_Thread(camera[0], camera[1], camera[2], camera[3])
threads.append(t)
for th in threads:
th.start()
# 可以在回调函数中加入一定的逻辑,退出线程,picQueue.put(2)
while 1: # 阻塞线程,可以一直抓拍
if picQueue.get() == 1: # 代表子线程结束,可以退出了
NET_DVR_Cleanup()
break
else:
picQueue.put(2)
if __name__ == '__main__':
main()
抓图
# 抓图数据结构体
class NET_DVR_JPEGPARA(ctypes.Structure):
_fields_ = [
("wPicSize", ctypes.c_ushort),
("wPicQuality", ctypes.c_ushort)]
# jpeg抓图hPlayWnd显示窗口能为none,存在缺点采集图片速度慢
def Get_JPEGpicture():
sJpegPicFileName = bytes("pytest.jpg", "ascii")
lpJpegPara=NET_DVR_JPEGPARA()
lpJpegPara.wPicSize=0
lpJpegPara.wPicQuality=0
if (callCpp("NET_DVR_CaptureJPEGPicture", lUserID, lChannel, ctypes.byref(lpJpegPara), sJpegPicFileName)== False):
error_info = callCpp("NET_DVR_GetLastError")
print("抓图失败:" + str(error_info))
else:
print("抓图成功")
总结
该程序使用的海康 SDK 版本为:CH-HCNetSDKV5.3.5.46_build20180518_Win64。主要是实现了【人脸抓拍模块流程】,获得人脸图及背景图。
参考
Windows64下通过python调用海康SDK实现登入、预览、抓图、光学变倍、相机激活、区域聚焦、区域曝光功能
python 调用海康SDK,登录,设置回调,布防,撤防,注销以及释放SDK
python如何调用C, 如何注册成C的回调函数(python后台程序常用方法)
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 bin07280@qq.com
文章标题:Python 调用海康 SDK
文章字数:2.2k
本文作者:Bin
发布时间:2018-09-26, 16:06:22
最后更新:2019-08-06, 00:07:35
原始链接:http://coolview.github.io/2018/09/26/Python/Python%20%E8%B0%83%E7%94%A8%E6%B5%B7%E5%BA%B7%20SDK/版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。