ECCE @ EIC Software
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ophBuffer.cc
Go to the documentation of this file. Or view the newest version in sPHENIX GitHub for file ophBuffer.cc
1 
2 #include <sys/types.h>
3 #include <sys/stat.h>
4 #include <fcntl.h>
5 #include <unistd.h>
6 
7 #include "ophBuffer.h"
8 
9 # include "Cframe.h"
10 # include "frameRoutines.h"
11 # include "frameHdr.h"
12 # include "A_Event.h"
13 
14 #include "BufferConstants.h"
15 #include "EventTypes.h"
16 
17 
18 // the constructor first ----------------
19 
20 
21 // the constructor first ----------------
22 ophBuffer::ophBuffer (const char *filename, PHDWORD * where, const int length, int &status
23  , const int irun, const int iseq)
24 {
25  status = 0;
26  our_fd = 1;
27 
28  fd = open(filename, O_WRONLY | O_CREAT | O_EXCL | O_LARGEFILE ,
29  S_IRWXU | S_IROTH | S_IRGRP );
30 
31  if ( fd < 0)
32  {
33  status =1;
34  good_object = 0;
35  return;
36  }
37  good_object = 1;
38  bptr = ( buffer_ptr) where;
39  data_ptr = &(bptr->data[0]);
43  bptr->ID = -64;
44  bptr->Bufseq = iseq;
45  bptr->Runnr = 0;
46  current_event = 0;
48  sequence = iseq;
49  eventsequence = 0;
50  runnumber = irun;
51  byteswritten = 0;
52 
53  prepare_next ();
54 }
55 
56 ophBuffer::ophBuffer (int fdin, PHDWORD * where, const int length
57  , const int irun, const int iseq)
58 {
59  fd = fdin;
60  our_fd = 0;
61  good_object = 1;
62  bptr = ( buffer_ptr) where;
63  data_ptr = &(bptr->data[0]);
67  bptr->ID = -64;
68  bptr->Bufseq = iseq;
69  bptr->Runnr = 0;
70  current_event = 0;
72  sequence = iseq;
73  eventsequence = 0;
74  runnumber = irun;
75  byteswritten = 0;
76 
77  prepare_next ();
78 }
79 
80 
81 // ---------------------------------------------------------
83 {
84 
85  // re-initialize the event header length
87  bptr->ID = -64;
88  bptr->Bufseq = 0;
89  sequence++;
90  bptr->Runnr = runnumber;
91 
92  current_index = 0;
94  has_end = 0;
95  dirty = 1;
96  return 0;
97 }
98 
99 
100 // ---------------------------------------------------------
101 int ophBuffer::nextEvent( const unsigned int evtsize, const int etype, const int evtseq)
102 {
103 
104  if (current_event) delete current_event;
105  current_event = 0;
106 
107  if (evtsize > max_size - EVTHEADERLENGTH) return -1;
108  if (evtsize <=0) return -2;
109 
110  if (evtsize > left-EOBLENGTH)
111  {
112  writeout();
113  prepare_next();
114  }
115 
116  if (etype >0) current_etype = etype;
117 
118  if (evtseq > 0) eventsequence = evtseq;
119  else eventsequence++;
120 
121 
122  current_event = new oEvent(&(bptr->data[current_index]), evtsize
124 
128  bptr->Bufseq++;
129 
130  dirty = 1;
131  return 0;
132 }
133 // ---------------------------------------------------------
134 int ophBuffer::addRawEvent( unsigned int *data)
135 {
136 
137  if ( ! good_object) return -1;
138  int wstatus;
139 
140  unsigned int nw = data[0];
141 
142  if ( nw > left-EOBLENGTH)
143  {
144  wstatus = writeout();
145  prepare_next();
146  if (wstatus) return wstatus;
147  }
148 
149  memcpy ( (char *) &(bptr->data[current_index]), (char *) data, 4*nw);
150 
151  left -= nw;
152  current_index += nw;
153  bptr->Length += nw*4;
154  bptr->Bufseq++;
155  dirty =1;
156  return 0;
157 }
158 
159 
160 // ---------------------------------------------------------
162 {
163 
164  if ( ! good_object) return -1;
165  int nw;
166 
167  int wstatus;
168 
169  runnumber = Evt->getRunNumber();
170 
171  if ( Evt->getEvtLength() > left-EOBLENGTH)
172  {
173  wstatus = writeout();
174  prepare_next();
175  if (wstatus) return wstatus;
176  }
177 
178  Evt->Copy( (int *) &(bptr->data[current_index]), Evt->getEvtLength(), &nw);
179 
180  left -= nw;
181  current_index += nw;
182  bptr->Length += nw*4;
183  bptr->Bufseq++;
184  dirty =1;
185  return 0;
186 }
187 
188 // ----------------------------------------------------------
190 {
191  int len;
192 
193  if ( ! good_object) return 0;
194 
195  len = current_event->addFrame(frame);
196 
197  left -= len;
198  current_index += len;
199  bptr->Length += len*4;
200 
201  // for (int k = 0; k<current_index; k++)
202  // COUT << k << " " << bptr->data[k] << std::endl;
203 
204  // COUT << "------------------" << std::endl;
205 
206 
207  return len;
208 }
209 
210 // ----------------------------------------------------------
212 {
213  int len;
214 
215  if ( ! good_object) return 0;
216 
217  len = current_event->addPacket(p);
218  if (len < 0) return 0;
219 
220  left -= len;
221  current_index += len;
222  bptr->Length += len*4;
223 
224 
225  return len;
226 }
227 
228 
229 
230 
231 // ----------------------------------------------------------
233  const int length,
234  const int id,
235  const int wordsize,
236  const int hitformat)
237 {
238  int len;
239 
240  if ( ! good_object) return 0;
241 
242  len = current_event->addUnstructPacketData(data, length
243  ,id , wordsize , hitformat);
244  if (len < 0) return 0;
245 
246  left -= len;
247  current_index += len;
248  bptr->Length += len*4;
249 
250  // for (int k = 0; k<current_index; k++)
251  // COUT << k << " " << bptr->data[k] << std::endl;
252 
253  // COUT << "------------------" << std::endl;
254 
255 
256  return len;
257 }
258 
259 
260 // ----------------------------------------------------------
261 // int ophBuffer::transfer(dataProtocol * protocol)
262 // {
263 // if (protocol)
264 // return protocol->transfer((char *) bptr, bptr->Length);
265 // else
266 // return 0;
267 
268 // }
269 
270 
271 // ----------------------------------------------------------
272 //
273 //
275 {
276 
277  // void *writeThread(void *);
278 
279  if ( ! good_object || fd <= 0 ) return -1;
280 
281 
282  if (! dirty) return 0;
283 
284  if (! has_end) addEoB();
285 
286  if (fd < 0) return 0;
287 
288 
289  unsigned int ip =0;
290  char *cp = (char *) bptr;
291 
292  while (ip < bptr->Length)
293  {
294  int n = write ( fd, cp, BUFFERBLOCKSIZE);
295  if ( n != BUFFERBLOCKSIZE)
296  {
297  std::cout << " could not write output, bytes written: " << n << std::endl;
298  return 0;
299  }
300  cp += BUFFERBLOCKSIZE;
301  ip+=BUFFERBLOCKSIZE;
302  }
303  dirty = 0;
304  byteswritten += ip;
305  return 0;
306 #ifdef WITHTHREADS
307 
308  }
309  else
310  {
311  //wait for a potential old thread to complete
312  if (ThreadId)
313  {
314  pthread_join(ThreadId, NULL);
315  byteswritten += thread_arg[2]; // the number of bytes written from previosu thread
316  }
317  if (! dirty) return 0;
318 
319  if (! has_end) addEoB();
320 
321  if (fd < 0) return 0;
322 
323  //swap the buffers around
324  buffer_ptr tmp = bptr_being_written;
325  bptr_being_written = bptr;
326  bptr = tmp;
327  dirty = 0;
328  // now fork off the write thread
329 
330  thread_arg[0] = (int) fd;
331  thread_arg[1] = (int) bptr_being_written;
332  thread_arg[2] = 0;
333 
334  // COUT << "starting write thread" << std::endl;
335  int s = pthread_create(&ThreadId, NULL, ophBuffer::writeThread, (void *) thread_arg);
336  //COUT << "create status is " << s << std::endl;
337 
338  return 0;
339 
340  }
341 #endif
342 }
343 
344 
345 // ----------------------------------------------------------
346 int ophBuffer::setMaxSize(const int size)
347 {
348  if (size < 0) return -1;
349  if (size == 0) max_size = max_length;
350  else
351  {
352  max_size = (size + ( BUFFERBLOCKSIZE - size%BUFFERBLOCKSIZE) ) /4;
353  if (max_size > max_length)
354  {
356  return -2;
357  }
358  }
359  return 0;
360 }
361 
362 // ----------------------------------------------------------
364 {
365  return max_size;
366 }
367 
368 // ----------------------------------------------------------
369 unsigned long long ophBuffer::getBytesWritten() const
370 {
371  return byteswritten;
372 }
373 
374 // ----------------------------------------------------------
376 {
377  if (has_end) return -1;
378  bptr->data[current_index++] = 2;
379  bptr->data[current_index++] = 0;
380  bptr->Length += 2*4;
381 
382  has_end = 1;
383  return 0;
384 }
385 
386 // ----------------------------------------------------------
388 {
389  writeout();
390  if (our_fd) close (fd);
391 
392 }
393 
394 
395 
396