c# - What type of IProducerConsumerCollection<T> to use for my task? -
i have 100 sensors each "measuring" own data. have 1 datasender should send information "sensors". recent information should sent.
bandwidth of channel may less data produced 100 sensors. in case data can skipped - should "roughly fair". example, skip every second measurement each sensor.
i don't know how each sensor generates data, in general generate data pretty often.
after other posts:
i have decided have classical producer/consumer problem, with:
- 100 producers, and
- 1 consumer
i've been suggested use blockingcollection
this. problem blockingcollection
- once have added item, cannot replace it. in application, if sensor produces new value, , previous value not processed consumer
, value should replaced.
should use use concurentdictionary
or concurentbag
task?
conceptually, all need array of 100 elements.
sensor #33 should replace it's value array[33]:
| sensor | value | |--------|-------| | 1 | | | 2 | | | 3 | | /......../......./ | 32 | | | 33 | 101.9 | | 34 | | /......../......./ | 98 | | | 99 | | | 100 | |
consumer
should take value array[33]
, if not null, send , set array[33] null. consumer
should react on not null values in array asap.
i think should implement own iproducerconsumercollection<t>
. that's why it's interface: make own.
you using dictionary<k,v>
, queue<t>
make sure receiving data fair, i.e. if have 1 device produces data fast, won't send data one.
public class devicedataqueue<tdevice, tdata> : iproducerconsumercollection<tuple<tdevice, tdata>> { private readonly object m_lockobject = new object(); private readonly dictionary<tdevice, tdata> m_data = new dictionary<tdevice, tdata>(); private readonly queue<tdevice> m_queue = new queue<tdevice>(); //some implemented methods elided, make sure thread-safe public int count { { return m_queue.count; } } public object syncroot { { return m_lockobject; } } public bool issynchronized { { return true; } } public bool tryadd(tuple<tdevice, tdata> item) { var device = item.item1; var data = item.item2; lock (m_lockobject) { if (!m_data.containskey(device)) m_queue.enqueue(device); m_data[device] = data; } return true; } public bool trytake(out tuple<tdevice, tdata> item) { lock (m_lockobject) { if (m_queue.count == 0) { item = null; return false; } var device = m_queue.dequeue(); var data = m_data[device]; m_data.remove(device); item = tuple.create(device, data); return true; } } }
when used along these lines:
queue = new blockingcollection<tuple<idevice, data>>( new devicedataqueue<idevice, data>()); device1 = new device(1, timespan.fromseconds(3), queue); device2 = new device(2, timespan.fromseconds(5), queue); while (true) { var tuple = queue.take(); var device = tuple.item1; var data = tuple.item2; console.writeline("{0}: device {1} produced data @ {2}.", datetime.now, device.id, data.created); thread.sleep(timespan.fromseconds(2)); }
it produces following output:
30.4.2011 20:40:43: device 1 produced data @ 30.4.2011 20:40:43. 30.4.2011 20:40:45: device 2 produced data @ 30.4.2011 20:40:44. 30.4.2011 20:40:47: device 1 produced data @ 30.4.2011 20:40:47. 30.4.2011 20:40:49: device 2 produced data @ 30.4.2011 20:40:49. 30.4.2011 20:40:51: device 1 produced data @ 30.4.2011 20:40:51. 30.4.2011 20:40:54: device 2 produced data @ 30.4.2011 20:40:54.
Comments
Post a Comment