Source code for pypot.sensor.kinect.sensor

"""
This code has been developed by Baptiste Busch: https://github.com/buschbapti

This module allows you to retrieve Skeleton information from a Kinect device.
It is only the client side of a zmq client/server application.

The server part can be found at: https://bitbucket.org/buschbapti/kinectserver/src
It used the Microsoft Kinect SDK and thus only work on Windows.

Of course, the client side can be used on any platform.

"""

import zmq
import numpy
import threading

from collections import namedtuple

from ...utils import Point3D, Point2D, Quaternion

torso_joints = ('hip_center', 'spine', 'shoulder_center', 'head')
left_arm_joints = ('shoulder_left', 'elbow_left', 'wrist_left', 'hand_left')
right_arm_joints = ('shoulder_right', 'elbow_right', 'wrist_right', 'hand_right')
left_leg_joints = ('hip_left', 'knee_left', 'ankle_left', 'foot_left')
right_leg_joints = ('hip_right', 'knee_right', 'ankle_right', 'foot_right')
skeleton_joints = torso_joints + left_arm_joints + right_arm_joints + left_leg_joints + right_leg_joints


[docs]class Skeleton(namedtuple('Skeleton', ('timestamp', 'user_id') + skeleton_joints)): joints = skeleton_joints
Joint = namedtuple('Joint', ('position', 'orientation', 'pixel_coordinate'))
[docs]class KinectSensor(object): def __init__(self, addr, port): self._lock = threading.Lock() self._skeleton = {} self.context = zmq.Context() self.sub_skel = self.context.socket(zmq.SUB) self.sub_skel.connect('tcp://{}:{}'.format(addr, port)) self.sub_skel.setsockopt(zmq.SUBSCRIBE, '') t = threading.Thread(target=self.get_skeleton) t.daemon = True t.start()
[docs] def remove_user(self, user_index): with self._lock: del self._skeleton[user_index]
[docs] def remove_all_users(self): with self._lock: self._skeleton = {}
@property def tracked_skeleton(self): with self._lock: return self._skeleton @tracked_skeleton.setter def tracked_skeleton(self, skeleton): with self._lock: self._skeleton[skeleton.user_id] = skeleton
[docs] def get_skeleton(self): while True: md = self.sub_skel.recv_json() msg = self.sub_skel.recv() skel_array = numpy.fromstring(msg, dtype=float, sep=",") skel_array = skel_array.reshape(md['shape']) nb_joints = md['shape'][0] joints = [] for i in range(nb_joints): x, y, z, w = skel_array[i][0:4] position = Point3D(x / w, y / w, z / w) pixel_coord = Point2D(*skel_array[i][4:6]) orientation = Quaternion(*skel_array[i][6:10]) joints.append(Joint(position, orientation, pixel_coord)) self.tracked_skeleton = Skeleton(md['timestamp'], md['user_index'], *joints)
[docs] def run(self): cv2.startWindowThread() while True: img = numpy.zeros((480, 640, 3)) skeleton = kinect.tracked_skeleton if skeleton: for user, skel in skeleton.iteritems(): for joint_name in skel.joints: x, y = getattr(skel, joint_name).pixel_coordinate pt = (int(x), int(y)) cv2.circle(img, pt, 5, (255, 255, 255), thickness=-1) kinect.remove_all_users() cv2.imshow('Skeleton', img) cv2.waitKey(50) self.sub_skel.close() self.context.term()
if __name__ == '__main__': import cv2 kinect = KinectSensor('193.50.110.177', 9999) kinect.run()