ECCE @ EIC Software
Reference for
ECCE @ EIC
simulation and reconstruction software on GitHub
Home page
Related Pages
Modules
Namespaces
Classes
Files
External Links
File List
File Members
All
Classes
Namespaces
Files
Functions
Variables
Typedefs
Enumerations
Enumerator
Friends
Macros
Groups
Pages
ophBuffer.cc
Go to the documentation of this file.
Or view
the newest version in sPHENIX GitHub for file ophBuffer.cc
1
2
#include <sys/types.h>
3
#include <sys/stat.h>
4
#include <fcntl.h>
5
#include <unistd.h>
6
7
#include "
ophBuffer.h
"
8
9
# include "
Cframe.h
"
10
# include "
frameRoutines.h
"
11
# include "
frameHdr.h
"
12
# include "
A_Event.h
"
13
14
#include "
BufferConstants.h
"
15
#include "
EventTypes.h
"
16
17
18
// the constructor first ----------------
19
20
21
// the constructor first ----------------
22
ophBuffer::ophBuffer
(
const
char
*
filename
,
PHDWORD
* where,
const
int
length
,
int
&status
23
,
const
int
irun,
const
int
iseq)
24
{
25
status = 0;
26
our_fd
= 1;
27
28
fd
=
open
(filename, O_WRONLY | O_CREAT | O_EXCL | O_LARGEFILE ,
29
S_IRWXU | S_IROTH | S_IRGRP );
30
31
if
(
fd
< 0)
32
{
33
status =1;
34
good_object
= 0;
35
return
;
36
}
37
good_object
= 1;
38
bptr
= (
buffer_ptr
) where;
39
data_ptr
= &(
bptr
->
data
[0]);
40
max_length
=
length
;
41
max_size
=
max_length
;
42
left
=
max_size
-
BUFFERHEADERLENGTH
;
43
bptr
->
ID
= -64;
44
bptr
->
Bufseq
= iseq;
45
bptr
->
Runnr
= 0;
46
current_event
= 0;
47
current_etype
=
DATAEVENT
;
48
sequence
= iseq;
49
eventsequence
= 0;
50
runnumber
= irun;
51
byteswritten
= 0;
52
53
prepare_next
();
54
}
55
56
ophBuffer::ophBuffer
(
int
fdin,
PHDWORD
* where,
const
int
length
57
,
const
int
irun,
const
int
iseq)
58
{
59
fd
= fdin;
60
our_fd
= 0;
61
good_object
= 1;
62
bptr
= (
buffer_ptr
) where;
63
data_ptr
= &(
bptr
->
data
[0]);
64
max_length
=
length
;
65
max_size
=
max_length
;
66
left
=
max_size
-
BUFFERHEADERLENGTH
;
67
bptr
->
ID
= -64;
68
bptr
->
Bufseq
= iseq;
69
bptr
->
Runnr
= 0;
70
current_event
= 0;
71
current_etype
=
DATAEVENT
;
72
sequence
= iseq;
73
eventsequence
= 0;
74
runnumber
= irun;
75
byteswritten
= 0;
76
77
prepare_next
();
78
}
79
80
81
// ---------------------------------------------------------
82
int
ophBuffer::prepare_next
()
83
{
84
85
// re-initialize the event header length
86
bptr
->
Length
=
BUFFERHEADERLENGTH
*4;
87
bptr
->
ID
= -64;
88
bptr
->
Bufseq
= 0;
89
sequence
++;
90
bptr
->
Runnr
=
runnumber
;
91
92
current_index
= 0;
93
left
=
max_size
-
BUFFERHEADERLENGTH
;
94
has_end
= 0;
95
dirty
= 1;
96
return
0;
97
}
98
99
100
// ---------------------------------------------------------
101
int
ophBuffer::nextEvent
(
const
unsigned
int
evtsize,
const
int
etype,
const
int
evtseq)
102
{
103
104
if
(
current_event
)
delete
current_event
;
105
current_event
= 0;
106
107
if
(evtsize >
max_size
-
EVTHEADERLENGTH
)
return
-1;
108
if
(evtsize <=0)
return
-2;
109
110
if
(evtsize >
left
-
EOBLENGTH
)
111
{
112
writeout
();
113
prepare_next
();
114
}
115
116
if
(etype >0)
current_etype
= etype;
117
118
if
(evtseq > 0)
eventsequence
= evtseq;
119
else
eventsequence
++;
120
121
122
current_event
=
new
oEvent
(&(
bptr
->
data
[
current_index
]), evtsize
123
,
bptr
->
Runnr
,
current_etype
,
eventsequence
);
124
125
left
-=
EVTHEADERLENGTH
;
126
current_index
+=
EVTHEADERLENGTH
;
127
bptr
->
Length
+=
EVTHEADERLENGTH
*4;
128
bptr
->
Bufseq
++;
129
130
dirty
= 1;
131
return
0;
132
}
133
// ---------------------------------------------------------
134
int
ophBuffer::addRawEvent
(
unsigned
int
*
data
)
135
{
136
137
if
( !
good_object
)
return
-1;
138
int
wstatus;
139
140
unsigned
int
nw = data[0];
141
142
if
( nw >
left
-
EOBLENGTH
)
143
{
144
wstatus =
writeout
();
145
prepare_next
();
146
if
(wstatus)
return
wstatus;
147
}
148
149
memcpy ( (
char
*) &(
bptr
->
data
[
current_index
]), (
char
*) data, 4*nw);
150
151
left
-= nw;
152
current_index
+= nw;
153
bptr
->
Length
+= nw*4;
154
bptr
->
Bufseq
++;
155
dirty
=1;
156
return
0;
157
}
158
159
160
// ---------------------------------------------------------
161
int
ophBuffer::addEvent
(
Event
*Evt)
162
{
163
164
if
( !
good_object
)
return
-1;
165
int
nw;
166
167
int
wstatus;
168
169
runnumber
= Evt->
getRunNumber
();
170
171
if
( Evt->
getEvtLength
() >
left
-
EOBLENGTH
)
172
{
173
wstatus =
writeout
();
174
prepare_next
();
175
if
(wstatus)
return
wstatus;
176
}
177
178
Evt->
Copy
( (
int
*) &(
bptr
->
data
[
current_index
]), Evt->
getEvtLength
(), &nw);
179
180
left
-= nw;
181
current_index
+= nw;
182
bptr
->
Length
+= nw*4;
183
bptr
->
Bufseq
++;
184
dirty
=1;
185
return
0;
186
}
187
188
// ----------------------------------------------------------
189
int
ophBuffer::addFrame
(
PHDWORD
*frame)
190
{
191
int
len
;
192
193
if
( !
good_object
)
return
0;
194
195
len =
current_event
->
addFrame
(frame);
196
197
left
-=
len
;
198
current_index
+=
len
;
199
bptr
->
Length
+= len*4;
200
201
// for (int k = 0; k<current_index; k++)
202
// COUT << k << " " << bptr->data[k] << std::endl;
203
204
// COUT << "------------------" << std::endl;
205
206
207
return
len
;
208
}
209
210
// ----------------------------------------------------------
211
int
ophBuffer::addPacket
(
const
Packet
*
p
)
212
{
213
int
len
;
214
215
if
( !
good_object
)
return
0;
216
217
len =
current_event
->
addPacket
(p);
218
if
(len < 0)
return
0;
219
220
left
-=
len
;
221
current_index
+=
len
;
222
bptr
->
Length
+= len*4;
223
224
225
return
len
;
226
}
227
228
229
230
231
// ----------------------------------------------------------
232
int
ophBuffer::addUnstructPacketData
(
PHDWORD
*
data
,
233
const
int
length
,
234
const
int
id
,
235
const
int
wordsize,
236
const
int
hitformat)
237
{
238
int
len
;
239
240
if
( !
good_object
)
return
0;
241
242
len =
current_event
->
addUnstructPacketData
(data, length
243
,
id
, wordsize , hitformat);
244
if
(len < 0)
return
0;
245
246
left
-=
len
;
247
current_index
+=
len
;
248
bptr
->
Length
+= len*4;
249
250
// for (int k = 0; k<current_index; k++)
251
// COUT << k << " " << bptr->data[k] << std::endl;
252
253
// COUT << "------------------" << std::endl;
254
255
256
return
len
;
257
}
258
259
260
// ----------------------------------------------------------
261
// int ophBuffer::transfer(dataProtocol * protocol)
262
// {
263
// if (protocol)
264
// return protocol->transfer((char *) bptr, bptr->Length);
265
// else
266
// return 0;
267
268
// }
269
270
271
// ----------------------------------------------------------
272
//
273
//
274
int
ophBuffer::writeout
()
275
{
276
277
// void *writeThread(void *);
278
279
if
( !
good_object
||
fd
<= 0 )
return
-1;
280
281
282
if
(!
dirty
)
return
0;
283
284
if
(!
has_end
)
addEoB
();
285
286
if
(
fd
< 0)
return
0;
287
288
289
unsigned
int
ip =0;
290
char
*
cp
= (
char
*)
bptr
;
291
292
while
(ip < bptr->Length)
293
{
294
int
n
= write (
fd
, cp,
BUFFERBLOCKSIZE
);
295
if
( n !=
BUFFERBLOCKSIZE
)
296
{
297
std::cout <<
" could not write output, bytes written: "
<< n << std::endl;
298
return
0;
299
}
300
cp +=
BUFFERBLOCKSIZE
;
301
ip+=
BUFFERBLOCKSIZE
;
302
}
303
dirty
= 0;
304
byteswritten
+= ip;
305
return
0;
306
#ifdef WITHTHREADS
307
308
}
309
else
310
{
311
//wait for a potential old thread to complete
312
if
(ThreadId)
313
{
314
pthread_join(ThreadId, NULL);
315
byteswritten += thread_arg[2];
// the number of bytes written from previosu thread
316
}
317
if
(! dirty)
return
0;
318
319
if
(! has_end) addEoB();
320
321
if
(
fd
< 0)
return
0;
322
323
//swap the buffers around
324
buffer_ptr
tmp
= bptr_being_written;
325
bptr_being_written = bptr;
326
bptr =
tmp
;
327
dirty = 0;
328
// now fork off the write thread
329
330
thread_arg[0] = (
int
)
fd
;
331
thread_arg[1] = (
int
) bptr_being_written;
332
thread_arg[2] = 0;
333
334
// COUT << "starting write thread" << std::endl;
335
int
s
= pthread_create(&ThreadId, NULL, ophBuffer::writeThread, (
void
*) thread_arg);
336
//COUT << "create status is " << s << std::endl;
337
338
return
0;
339
340
}
341
#endif
342
}
343
344
345
// ----------------------------------------------------------
346
int
ophBuffer::setMaxSize
(
const
int
size)
347
{
348
if
(size < 0)
return
-1;
349
if
(size == 0)
max_size
=
max_length
;
350
else
351
{
352
max_size
= (size + (
BUFFERBLOCKSIZE
- size%
BUFFERBLOCKSIZE
) ) /4;
353
if
(
max_size
>
max_length
)
354
{
355
max_size
=
max_length
;
356
return
-2;
357
}
358
}
359
return
0;
360
}
361
362
// ----------------------------------------------------------
363
int
ophBuffer::getMaxSize
()
const
364
{
365
return
max_size
;
366
}
367
368
// ----------------------------------------------------------
369
unsigned
long
long
ophBuffer::getBytesWritten
()
const
370
{
371
return
byteswritten
;
372
}
373
374
// ----------------------------------------------------------
375
int
ophBuffer::addEoB
()
376
{
377
if
(
has_end
)
return
-1;
378
bptr
->
data
[
current_index
++] = 2;
379
bptr
->
data
[
current_index
++] = 0;
380
bptr
->
Length
+= 2*4;
381
382
has_end
= 1;
383
return
0;
384
}
385
386
// ----------------------------------------------------------
387
ophBuffer::~ophBuffer
()
388
{
389
writeout
();
390
if
(
our_fd
)
close
(
fd
);
391
392
}
393
394
395
396
online_distribution
blob
master
newbasic
ophBuffer.cc
Built by
Jin Huang
. updated:
Wed Jun 29 2022 17:25:59
using
1.8.2 with
ECCE GitHub integration