Reimplemented message_queue with a circular buffer index

[SVN r79898]
This commit is contained in:
Ion Gaztañaga
2012-08-07 09:13:23 +00:00
parent b8a79215b3
commit 8ca8e9a2ab
3 changed files with 90 additions and 17 deletions

View File

@@ -622,14 +622,9 @@ to shared memory created with other process that don't use
Windows shared memory creation is a bit different from portable shared memory
creation: the size of the segment must be specified when creating the object and
can't be specified through `truncate` like with the shared memory object.
Take in care that when the last process attached to a shared memory is destroyed
[*the shared memory is destroyed] so there is [*no persistency] with native windows
shared memory. Native windows shared memory has also another limitation: a process can
open and map the whole shared memory created by another process but it can't know
which is the size of that memory. This limitation is imposed by the Windows API so
the user must somehow transmit the size of the segment to processes opening the
segment.
shared memory.
Sharing memory between services and user applications is also different. To share memory
between services and user applications the name of the shared memory must start with the
@@ -865,7 +860,7 @@ to create `mapped_region` objects. A mapped region created from a shared memory
object or a file mapping are the same class and this has many advantages.
One can, for example, mix in STL containers mapped regions from shared memory
and memory mapped files. The libraries that only depend on mapped regions can
and memory mapped files. Libraries that only depend on mapped regions can
be used to work with shared memory or memory mapped files without recompiling them.
[endsect]
@@ -881,7 +876,7 @@ surely different in each process. Since each process might have used its address
in a different way (allocation of more or less dynamic memory, for example), there is
no guarantee that the file/shared memory is going to be mapped in the same address.
If two processes map the same object in different addresses, this invalids the use
If two processes map the same object in different addresses, this invalidates the use
of pointers in that memory, since the pointer (which is an absolute address) would
only make sense for the process that wrote it. The solution for this is to use offsets
(distance) between objects instead of pointers: If two objects are placed in the same
@@ -6574,6 +6569,8 @@ example, a new managed shared memory that uses the new index:
[section:notes_windows Notes for Windows users]
[section:notes_windows_com_init COM Initialization]
[*Boost.Interprocess] uses the COM library to implement some features and initializes
the COM library with concurrency model `COINIT_APARTMENTTHREADED`.
If the COM library was already initialized by the calling thread for other model, [*Boost.Interprocess]
@@ -6588,6 +6585,44 @@ might want [*Boost.Interprocess] to initialize the COM library with another mode
[endsect]
[section:notes_windows_shm_folder Shared memory emulation folder]
Shared memory (`shared_memory_object`) is implemented in windows using memory mapped files, placed in a
directory in the shared documents folder (`SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders\Common AppData`).
This directory name is the last bootup time (obtained via COM calls), so that each bootup shared memory is created in a new
folder obtaining kernel persistence shared memory.
Unfortunately, due to COM implementation related errors, in Boost 1.48 & Boost 1.49 the bootup-time folder was dumped and files
were directly created in shared documents folder, reverting to filesystem persistence shared memory. Boost 1.50 fixed those issues
and recovered bootup time directory and kernel persistence. If you need to reproduce Boost 1.48 & Boost 1.49 behaviour to communicate
with applications compiled with that version, comment `#define BOOST_INTERPROCESS_HAS_KERNEL_BOOTTIME` directive
in the Windows configuration part of `boost/interprocess/detail/workaround.hpp`.
[endsect]
[endsect]
[section:notes_linux Notes for Linux users]
[section:notes_linux_overcommit Overcommit]
The committed address space is the total amount of virtual memory (swap or physical memory/RAM) that the kernel might have to supply
if all applications decide to access all of the memory they've requested from the kernel.
By default, Linux allows processes to commmit more virtual memory than available in the system. If that memory is not
accessed, no physical memory + swap is actually used.
The reason for this behaviour is that Linux tries to optimize memory usage on forked processes; fork() creates a full copy of
the process space, but with overcommitted memory, in this new forked instance only pages which have been written to actually need
to be allocated by the kernel. If applications access more memory than available, then the kernel must free memory in the hard way:
the OOM (Out Of Memory)-killer picks some processes to kill in order to recover memory.
[*Boost.Interprocess] has no way to change this behaviour and users might suffer the OOM-killer when accessing shared memory.
According to the [@http://www.kernel.org/doc/Documentation/vm/overcommit-accounting Kernel documentation], the
Linux kernel supports several overcommit modes. If you need non-kill guarantees in your application, you should
change this overcommit behaviour.
[endsect]
[endsect]
[section:thanks_to Thanks to...]
@@ -6635,6 +6670,19 @@ thank them:
[section:release_notes Release Notes]
[section:release_notes_boost_1_52_00 Boost 1.52 Release]
* Added `shrink_by` and `advise` functions in `mapped_region`.
* Reimplemented `message_queue` with a circular buffer index (the
old behavior used an ordered array, leading to excessive copies). This
should greatly increase performance but breaks ABI. Old behaviour/ABI can be used
undefining macro `BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX` in `boost/interprocess/detail/workaround.hpp`
* Improved `message_queue` insertion time avoiding priority search for common cases
(both array and circular buffer configurations).
[endsect]
[section:release_notes_boost_1_51_00 Boost 1.51 Release]
* Synchronous and asynchronous flushing for `mapped_region::flush`.

View File

@@ -13,7 +13,7 @@
<Configuration
Name="Debug|Win32"
OutputDirectory="../../Bin/Win32/Debug"
IntermediateDirectory="Debug/shared_message_queue_test"
IntermediateDirectory="Debug/message_queue_test"
ConfigurationType="1"
CharacterSet="2">
<Tool
@@ -35,7 +35,7 @@
<Tool
Name="VCLinkerTool"
AdditionalDependencies="winmm.lib"
OutputFile="$(OutDir)/shared_message_queue_test_d.exe"
OutputFile="$(OutDir)/message_queue_test_d.exe"
LinkIncremental="1"
AdditionalLibraryDirectories="../../../../stage/lib"
GenerateDebugInformation="TRUE"
@@ -67,7 +67,7 @@
<Configuration
Name="Release|Win32"
OutputDirectory="../../Bin/Win32/Release"
IntermediateDirectory="Release/shared_message_queue_test"
IntermediateDirectory="Release/message_queue_test"
ConfigurationType="1"
CharacterSet="2">
<Tool
@@ -86,7 +86,7 @@
<Tool
Name="VCLinkerTool"
AdditionalDependencies="winmm.lib"
OutputFile="$(OutDir)/shared_message_queue_test.exe"
OutputFile="$(OutDir)/message_queue_test.exe"
LinkIncremental="1"
AdditionalLibraryDirectories="../../../../stage/lib"
GenerateDebugInformation="TRUE"

View File

@@ -48,6 +48,8 @@ bool test_priority_order()
message_queue::size_type recvd = 0;
unsigned int priority = 0;
std::size_t tstamp;
unsigned int priority_prev;
std::size_t tstamp_prev;
//We will send 100 message with priority 0-9
//The message will contain the timestamp of the message
@@ -56,8 +58,31 @@ bool test_priority_order()
mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(i%10));
}
unsigned int priority_prev = (std::numeric_limits<unsigned int>::max)();
std::size_t tstamp_prev = 0;
priority_prev = (std::numeric_limits<unsigned int>::max)();
tstamp_prev = 0;
//Receive all messages and test those are ordered
//by priority and by FIFO in the same priority
for(std::size_t i = 0; i < 100; ++i){
mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
if(priority > priority_prev)
return false;
if(priority == priority_prev &&
tstamp <= tstamp_prev){
return false;
}
priority_prev = priority;
tstamp_prev = tstamp;
}
//Now retry it with different priority order
for(std::size_t i = 0; i < 100; ++i){
tstamp = i;
mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(9 - i%10));
}
priority_prev = (std::numeric_limits<unsigned int>::max)();
tstamp_prev = 0;
//Receive all messages and test those are ordered
//by priority and by FIFO in the same priority
@@ -83,7 +108,7 @@ bool test_priority_order()
//another buffer and checks it against the original data-base
bool test_serialize_db()
{
//Typedef data to create a Interprocess map
//Typedef data to create a Interprocess map
typedef std::pair<const std::size_t, std::size_t> MyPair;
typedef std::less<std::size_t> MyLess;
typedef node_allocator<MyPair, managed_external_buffer::segment_manager>
@@ -157,7 +182,7 @@ bool test_serialize_db()
break;
}
}
//The buffer will contain a copy of the original database
//so let's interpret the buffer with managed_external_buffer
managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize);
@@ -188,7 +213,7 @@ bool test_serialize_db()
return false;
}
}
//Destroy maps from db-s
db_origin.destroy_ptr(map1);
db_destiny.destroy_ptr(map2);