Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

# SPDX-License-Identifier: MIT 

 

import copy 

import logging 

import random 

import sched 

import time 

import threading 

 

from typing import Any, ClassVar, Dict, List, Tuple 

 

from ratbag_emu.actuator import Actuator 

from ratbag_emu.endpoint import Endpoint 

from ratbag_emu.firmware import Firmware 

from ratbag_emu.hw_component import HWComponent 

from ratbag_emu.util import EventData, mm2inch, ms2s 

 

 

class Device(object): 

''' 

Represents a real device 

 

:param name: Device name 

:param info: Bus information (bus, vid, pid) 

:param rdescs: Array of report descriptors 

''' 

device_list: ClassVar[List[str]] = [] 

 

def __init__(self, name: str = 'Generic Device', 

info: Tuple[int, int, int] = (0x03, 0x0001, 0x0001), 

rdescs: List[List[int]] = [[ 

# Generic mouse report descriptor 

0x05, 0x01, # .Usage Page (Generic Desktop) 0 

0x09, 0x02, # .Usage (Mouse) 2 

0xa1, 0x01, # .Collection (Application) 4 

0x09, 0x02, # ..Usage (Mouse) 6 

0xa1, 0x02, # ..Collection (Logical) 8 

0x09, 0x01, # ...Usage (Pointer) 10 

0xa1, 0x00, # ...Collection (Physical) 12 

0x05, 0x09, # ....Usage Page (Button) 14 

0x19, 0x01, # ....Usage Minimum (1) 16 

0x29, 0x03, # ....Usage Maximum (3) 18 

0x15, 0x00, # ....Logical Minimum (0) 20 

0x25, 0x01, # ....Logical Maximum (1) 22 

0x75, 0x01, # ....Report Size (1) 24 

0x95, 0x03, # ....Report Count (3) 26 

0x81, 0x02, # ....Input (Data,Var,Abs) 28 

0x75, 0x05, # ....Report Size (5) 30 

0x95, 0x01, # ....Report Count (1) 32 

0x81, 0x03, # ....Input (Cnst,Var,Abs) 34 

0x05, 0x01, # ....Usage Page (Generic Desktop) 36 

0x09, 0x30, # ....Usage (X) 38 

0x09, 0x31, # ....Usage (Y) 40 

0x15, 0x81, # ....Logical Minimum (-127) 42 

0x25, 0x7f, # ....Logical Maximum (127) 44 

0x75, 0x08, # ....Report Size (8) 46 

0x95, 0x02, # ....Report Count (2) 48 

0x81, 0x06, # ....Input (Data,Var,Rel) 50 

0xc0, # ...End Collection 52 

0xc0, # ..End Collection 53 

0xc0, # .End Collection 54 

]]): 

self.__logger = logging.getLogger('ratbag-emu.device') 

self._name = name 

self._info = info 

self._rdescs = rdescs 

 

# Find a unique ID for this device 

unique = False 

while not unique: 

self.id = self.generate_name() 

try: 

for id in self.device_list: 

assert id != self.id 

unique = True 

except AssertionError: 

pass 

 

self.endpoints = [] 

for i, r in enumerate(rdescs): 

self.endpoints.append(Endpoint(self, r, i)) 

 

self.report_rate = 100 

self.fw = Firmware(self) 

self.hw: Dict[str, HWComponent] = {} 

self.actuators: List[Actuator] = [] 

 

@classmethod 

def generate_name(cls) -> str: 

''' 

Generates a random name 

''' 

device_names = [ 

'mara', 'capybara', 'porcupine', 'paca', 

'vole', 'woodrat', 'gerbil', 'shrew', 

'hutia', 'beaver', 'squirrel', 'chinchilla', 

'rabbit', 'viscacha', 'hare', 'degu', 

'gundi', 'acouchy', 'nutria', 'paca', 

'hamster', 'zokor', 'chipmunk', 'gopher', 

'marmot', 'groundhog', 'suslik', 'agouti', 

'blesmol', 

] 

 

device_attr = [ 

'sobbing', 'whooping', 'barking', 'yapping', 

'howling', 'squawking', 'cheering', 'warbling', 

'thundering', 'booming', 'blustering', 'humming', 

'crying', 'bawling', 'roaring', 'raging', 

'chanting', 'crooning', 'murmuring', 'bellowing', 

'wailing', 'weeping', 'screaming', 'yelling', 

'yodeling', 'singing', 'honking', 'hooting', 

'whispering', 'hollering', 

] 

 

name = device_names[random.randint(0, len(device_names)-1)] 

attr = device_attr[random.randint(0, len(device_attr)-1)] 

return '-'.join([attr, name]) 

 

@property 

def name(self): 

return self._name 

 

@property 

def info(self): 

return self._info 

 

@property 

def rdescs(self): 

return self._rdescs 

 

@property 

def actuators(self): 

return self._actuators 

 

@actuators.setter 

def actuators(self, val): 

# Make sure we don't have actuators which will act on the same keys 

seen = [] 

for keys in [a.keys for a in val]: 

for el in keys: 

assert el not in seen 

seen.append(el) 

 

self._actuators = val 

 

def destroy(self): 

for endpoint in self.endpoints: 

endpoint.destroy() 

 

def transform_action(self, data: Dict[str, Any]): 

''' 

Transforms high-level action according to the actuators 

 

A high-level action will have the x, y values in mm. This values will 

be converted to pixel by the device actuators (in this case, the 

sensor/dpi actuator) 

 

:param action: high-level action 

''' 

hid_data: Dict[str, Any] = {} 

 

for actuator in self.actuators: 

hid_data.update(actuator.transform(data.copy())) 

 

return hid_data 

 

def send_hid_action(self, action: object): 

''' 

Sends a HID action 

 

We assume there's only one endpoint for each type of action (mouse, 

keyboard, button, etc.) so we send the action to all endpoints. The 

endpoint will only act on the action if it supports it. 

 

:param action: HID action 

''' 

for endpoint in self.endpoints: 

# FIXME: global data (0x11)? 

endpoint.send(endpoint.create_report(action, 0x11)) 

 

def simulate_action(self, action: Dict[str, Any], type: int = None): 

''' 

Simulates action 

 

Translates physical values according to the device properties and 

converts action into HID reports. 

 

:param action: high-level action 

:param type: HID report type 

''' 

 

packets: Dict[int, EventData] = {} 

 

report_count = int(round(ms2s(action['duration']) * self.report_rate)) 

 

if not report_count: 

report_count = 1 

 

# XY movement 

if action['type'] == 'xy': 

# We assume a linear motion 

pixel_buffer = {} 

step = {} 

 

''' 

Initialize pixel_buffer, real_pixel_buffer and step for X and Y 

 

pixel_buffer holds the ammount of pixels left to send (kinda, 

read below). 

 

We actually have two variables for this, real_pixel_buffer and 

pixel_buffer. pixel_buffer mimics the user movement and 

real_pixel_buffer holds true number of pixels left to send. 

When using high report rates (ex. 1000Hz) we usually don't 

have a full pixel to send, that's why we need two variables. We 

subtract the step to pixel_buffer at each iteration, when the 

difference between pixel_buffer and real_pixel_buffer is equal 

or higher than 1 pixel we then send a HID report to the device 

with that difference (int! we send the int part of the 

difference) and update real_pixel_buffer to include this 

changes. 

''' 

pixel_buffer = self.transform_action(action['data']) 

 

for attr in ['x', 'y']: 

assert pixel_buffer[attr] <= 127 * report_count 

step[attr] = pixel_buffer[attr] / report_count 

 

real_pixel_buffer = copy.deepcopy(pixel_buffer) 

 

i = 0 

while real_pixel_buffer['x'] != 0: 

if i not in packets: 

packets[i] = EventData() 

 

for attr in ['x', 'y']: 

pixel_buffer[attr] -= step[attr] 

diff = int(round(real_pixel_buffer[attr] - 

pixel_buffer[attr])) 

# FIXME: Read max size from the report descriptor 

''' 

The max is 127, if this happens we need to leave the 

excess in the buffer for it to be sent in the next 

report 

''' 

if abs(diff) >= 1: 

if abs(diff) > 127: 

diff = 127 if diff > 0 else -127 

setattr(packets[i], attr, diff) 

real_pixel_buffer[attr] -= diff 

i += 1 

 

# Button 

elif action['type'] == 'button': 

for i in range(report_count): 

if i not in packets: 

packets[i] = EventData() 

 

setattr(packets[i], 'b{}'.format(action['data']['id']), 1) 

 

def send_packets(): 

nonlocal packets 

s = sched.scheduler(time.time, time.sleep) 

next_time = 0 

for i in range(len(packets)): 

s.enter(next_time, 1, self.send_hid_action, 

kwargs={'action': packets[i]}) 

next_time += 1 / self.report_rate 

s.run() 

 

sim_thread = threading.Thread(target=send_packets) 

sim_thread.start()