Compare commits

...

6 Commits

Author SHA1 Message Date
mainzer
beee2bdb64 Added skeletal version of H5FDsubfiling.c and H5FDsubfiling.h.
These are based on H5FDsec2, and will have to be re-worked heavily
for sub-filing.

Also added minimal test (based on existing sec2 tests) in test/vfd.c
Since sub-filing is mostly a parallel feature, most testing will have
to be moved to testpar.  That said, a serial version of sub-filing will
be neeced eventually, so it makes sense to retain at least a stub in
the serial vfd tests.

Tested serial and parallel on charis and jelly.  Encountered errors in
h5diff tests in the parallel builds, but am ignoring them as they also
appear in the version prior to this checkin.
2020-07-16 15:07:59 -05:00
Richard Warren
8b71674880 Various bug fixes for the multi-threaded IO concentrator(s) 2020-07-01 12:15:16 -04:00
mainzer
5a6d9b3a4e Added support for vector I/O to the VFD layer, and associated test code
in test/vfd.c.  Note that this test uses only sec2 and stdio at present.
Since no VFDs support vector I/O at present, the vector I/O VFD calls
are translated to regular read/write calls in the VFD layer.

Tested serial and parallel, debug and production on charis, parallel
debug on jelly.  Some seemingly un-related issues, but after consultation
with Richard, I'm proceeding with the checkin.
2020-06-29 11:09:40 -05:00
Richard Warren
96e912c842 Added the subfile_readwrite test and modified the implementation as a result of debugging 2020-06-25 11:09:23 -04:00
Richard Warren
72268e7d67 Update Makefile.am files and fix issues unearthed by building with autotools 2020-06-23 14:59:57 -04:00
Richard Warren
5e02da94f1 Initial subfiling branch 2020-06-22 14:46:24 -04:00
71 changed files with 11572 additions and 19 deletions

View File

@@ -710,8 +710,12 @@
./src/H5FDspace.c
./src/H5FDsplitter.c
./src/H5FDsplitter.h
./src/H5FDsubfile.c
./src/H5FDsubfile.h
./src/H5FDstdio.c
./src/H5FDstdio.h
./src/H5FDsubfiling.c
./src/H5FDsubfiling.h
./src/H5FDtest.c
./src/H5FDwindows.c
./src/H5FDwindows.h

View File

@@ -243,6 +243,9 @@ set (H5FD_SOURCES
${HDF5_SRC_DIR}/H5FDstdio.c
${HDF5_SRC_DIR}/H5FDtest.c
${HDF5_SRC_DIR}/H5FDwindows.c
${HDF5_SRC_DIR}/H5FDsubfile.c
${HDF5_SRC_DIR}/H5FDsubfile_threads.c
${HDF5_SRC_DIR}/H5FDsubfile_mpi.c
)
set (H5FD_HDRS
@@ -262,6 +265,10 @@ set (H5FD_HDRS
${HDF5_SRC_DIR}/H5FDsplitter.h
${HDF5_SRC_DIR}/H5FDstdio.h
${HDF5_SRC_DIR}/H5FDwindows.h
${HDF5_SRC_DIR}/H5FDsubfile_public.h
${HDF5_SRC_DIR}/mercury/mercury_thread.h
${HDF5_SRC_DIR}/mercury/mercury_thread_mutex.h
${HDF5_SRC_DIR}/mercury/mercury_log.h
)
IDE_GENERATED_PROPERTIES ("H5FD" "${H5FD_HDRS}" "${H5FD_SOURCES}" )

View File

@@ -1476,6 +1476,157 @@ done:
FUNC_LEAVE_API(ret_value)
} /* end H5FDwrite() */
/*-------------------------------------------------------------------------
* Function: H5FDread_vector
*
* Purpose: Perform count reads from the specified file at the offsets
* provided in the addrs array, with the lengths and memory
* types provided in the sizes and types arrays. Data read
* is returned in the buffers provided in the bufs array.
*
* All reads are done according to the data transfer property
* list dxpl_id (which may be the constant H5P_DEFAULT).
*
* Return: Success: SUCCEED
* All reads have completed successfully, and
* the results havce been into the supplied
* buffers.
*
* Failure: FAIL
* The contents of supplied buffers are undefined.
*
* Programmer: JRM -- 6/10/20
*
* Changes: None.
*
*-------------------------------------------------------------------------
*/
herr_t
H5FDread_vector(H5FD_t *file, hid_t dxpl_id, uint32_t count,
H5FD_mem_t types[], haddr_t addrs[], size_t sizes[],
void *bufs[] /* out */)
{
herr_t ret_value = SUCCEED; /* Return value */
FUNC_ENTER_API(FAIL)
H5TRACE7("e", "*xiIu*Mt*a*zx", file, dxpl_id, count, types, addrs, sizes, bufs);
/* Check arguments */
if(!file)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "file pointer cannot be NULL")
if(!file->cls)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "file class pointer cannot be NULL")
if((!types) && (count > 0))
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "types parameter can't be NULL if count is positive")
if((!addrs) && (count > 0))
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "addrs parameter can't be NULL if count is positive")
if((!sizes) && (count > 0))
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "sizes parameter can't be NULL if count is positive")
if((!bufs) && (count > 0))
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "bufs parameter can't be NULL if count is positive")
/* Get the default dataset transfer property list if the user
* didn't provide one
*/
if(H5P_DEFAULT == dxpl_id) {
dxpl_id = H5P_DATASET_XFER_DEFAULT;
} else {
if(TRUE != H5P_isa_class(dxpl_id, H5P_DATASET_XFER))
HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a data transfer property list")
}
/* Set DXPL for operation */
H5CX_set_dxpl(dxpl_id);
/* Call private function */
/* JRM -- review this */
/* (Note compensating for base addresses addition in internal routine) */
if(H5FD_read_vector(file, count, types, addrs, sizes, bufs) < 0)
HGOTO_ERROR(H5E_VFL, H5E_READERROR, FAIL, "file vector read request failed")
done:
FUNC_LEAVE_API(ret_value)
} /* end H5FDread_vector() */
/*-------------------------------------------------------------------------
* Function: H5FDwrite_vector
*
* Purpose: Perform count writes to the specified file at the offsets
* provided in the addrs array, with the lengths and memory
* types provided in the sizes and types arrays. Data to be
* written is in the buffers provided in the bufs array.
*
* All writes are done according to the data transfer property
* list dxpl_id (which may be the constant H5P_DEFAULT).
*
* Return: Success: SUCCEED
* All writes have completed successfully
*
* Failure: FAIL
* One or more of the writes failed.
*
* Programmer: JRM -- 6/10/20
*
* Changes: None.
*
*-------------------------------------------------------------------------
*/
herr_t
H5FDwrite_vector(H5FD_t *file, hid_t dxpl_id, uint32_t count,
H5FD_mem_t types[], haddr_t addrs[], size_t sizes[],
void *bufs[] /* in */)
{
herr_t ret_value = SUCCEED; /* Return value */
FUNC_ENTER_API(FAIL)
H5TRACE7("e", "*xiIu*Mt*a*z**x", file, dxpl_id, count, types, addrs, sizes, bufs);
/* Check arguments */
if(!file)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "file pointer cannot be NULL")
if(!file->cls)
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "file class pointer cannot be NULL")
if((!types) && (count > 0))
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "types parameter can't be NULL if count is positive")
if((!addrs) && (count > 0))
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "addrs parameter can't be NULL if count is positive")
if((!sizes) && (count > 0))
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "sizes parameter can't be NULL if count is positive")
if((!bufs) && (count > 0))
HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "bufs parameter can't be NULL if count is positive")
/* Get the default dataset transfer property list if the user didn't provide one */
if(H5P_DEFAULT == dxpl_id) {
dxpl_id = H5P_DATASET_XFER_DEFAULT;
} else {
if(TRUE != H5P_isa_class(dxpl_id, H5P_DATASET_XFER))
HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a data transfer property list")
}
/* Set DXPL for operation */
H5CX_set_dxpl(dxpl_id);
/* Call private function */ /* JRM -- review this */
/* (Note compensating for base address addition in internal routine) */
if(H5FD_write_vector(file, count, types, addrs, sizes, bufs) < 0)
HGOTO_ERROR(H5E_VFL, H5E_WRITEERROR, FAIL, "file vector write request failed")
done:
FUNC_LEAVE_API(ret_value)
} /* end H5FDwrite_vector() */
/*-------------------------------------------------------------------------
* Function: H5FDflush

View File

@@ -175,6 +175,8 @@ static const H5FD_class_t H5FD_core_g = {
H5FD__core_get_handle, /* get_handle */
H5FD__core_read, /* read */
H5FD__core_write, /* write */
NULL, /* read_vector */
NULL, /* write_vector */
H5FD__core_flush, /* flush */
H5FD__core_truncate, /* truncate */
H5FD_core_lock, /* lock */

View File

@@ -167,6 +167,8 @@ static const H5FD_class_t H5FD_direct_g = {
H5FD_direct_get_handle, /*get_handle */
H5FD_direct_read, /*read */
H5FD_direct_write, /*write */
NULL, /* read_vector */
NULL, /* write_vector */
NULL, /*flush */
H5FD_direct_truncate, /*truncate */
H5FD_direct_lock, /*lock */

View File

@@ -135,6 +135,8 @@ static const H5FD_class_t H5FD_family_g = {
H5FD_family_get_handle, /*get_handle */
H5FD_family_read, /*read */
H5FD_family_write, /*write */
NULL, /*read_vector */
NULL, /*write_vector */
H5FD_family_flush, /*flush */
H5FD_family_truncate, /*truncate */
H5FD_family_lock, /*lock */

View File

@@ -517,6 +517,8 @@ static const H5FD_class_t H5FD_hdfs_g = {
H5FD_hdfs_get_handle, /* get_handle */
H5FD_hdfs_read, /* read */
H5FD_hdfs_write, /* write */
NULL, /* read_vector */
NULL, /* write_vector */
NULL, /* flush */
H5FD_hdfs_truncate, /* truncate */
H5FD_hdfs_lock, /* lock */

View File

@@ -252,6 +252,259 @@ done:
FUNC_LEAVE_NOAPI(ret_value)
} /* end H5FD_write() */
/*-------------------------------------------------------------------------
* Function: H5FD_read_vector
*
* Purpose: Private version of H5FDread_vector()
*
* Perform count reads from the specified file at the offsets
* provided in the addrs array, with the lengths and memory
* types provided in the sizes and types arrays. Data read
* is returned in the buffers provided in the bufs array.
*
* If the underlying VFD supports vector reads, pass the
* call through directly.
*
* If it doesn't, convert the vector read into a sequence
* of individual reads.
*
* Note that it is not in general possible to convert a
* vector read into a selection read, because each element
* in the vector read may have a different memory type.
* In contrast, selection reads are of a single type.
*
* Return: Success: SUCCEED
* All reads have completed successfully, and
* the results havce been into the supplied
* buffers.
*
* Failure: FAIL
* The contents of supplied buffers are undefined.
*
* Programmer: JRM -- 6/10/20
*
* Changes: None
*
*-------------------------------------------------------------------------
*/
herr_t
H5FD_read_vector(H5FD_t *file, uint32_t count, H5FD_mem_t types[],
haddr_t addrs[], size_t sizes[], void * bufs[] /* out */)
{
hbool_t addrs_cooked = FALSE;
uint32_t i;
hid_t dxpl_id = H5I_INVALID_HID; /* DXPL for operation */
herr_t ret_value = SUCCEED; /* Return value */
FUNC_ENTER_NOAPI(FAIL)
/* Sanity checks */
HDassert(file);
HDassert(file->cls);
HDassert((types) || (count == 0));
HDassert((addrs) || (count == 0));
HDassert((sizes) || (count == 0));
HDassert((bufs) || (count == 0));
/* Get proper DXPL for I/O */
dxpl_id = H5CX_get_dxpl();
#ifndef H5_HAVE_PARALLEL
/* The no-op case
*
* Do not return early for Parallel mode since the I/O could be a
* collective transfer.
*/
if(0 == count) {
HGOTO_DONE(SUCCEED)
}
#endif /* H5_HAVE_PARALLEL */
if ( file->base_addr > 0 ) {
/* apply the base_addr offset to the addrs array. Must undo before
* we return.
*/
for ( i = 0; i < count; i++ ) {
addrs[i] += file->base_addr;
}
addrs_cooked = TRUE;
}
/* If the file is open for SWMR read access, allow access to data past
* the end of the allocated space (the 'eoa'). This is done because the
* eoa stored in the file's superblock might be out of sync with the
* objects being written within the file by the application performing
* SWMR write operations.
*/
if(!(file->access_flags & H5F_ACC_SWMR_READ)) {
haddr_t eoa;
for ( i = 0; i < count; i++ ) {
if(HADDR_UNDEF == (eoa = (file->cls->get_eoa)(file, types[i])))
HGOTO_ERROR(H5E_VFL, H5E_CANTINIT, FAIL, "driver get_eoa request failed")
if((addrs[i] + sizes[i]) > eoa)
HGOTO_ERROR(H5E_ARGS, H5E_OVERFLOW, FAIL, "addr overflow, addrs[%d] = %llu, sizes[%d] = %llu, eoa = %llu", (int)i, (unsigned long long)(addrs[i]), (int)i, (unsigned long long)sizes[i], (unsigned long long)eoa)
}
}
/* if the underlying VFD supports vector read, make the call */
if (file->cls->read_vector) {
if ((file->cls->read_vector)(file, dxpl_id, count, types, addrs, sizes, bufs) < 0)
HGOTO_ERROR(H5E_VFL, H5E_READERROR, FAIL, "driver read vector request failed")
} else {
/* otherwise, implement the vector read as a sequence of regular
* read calls.
*/
for ( i = 0; i < count; i++ ) {
if((file->cls->read)(file, types[i], dxpl_id, addrs[i], sizes[i], bufs[i]) < 0)
HGOTO_ERROR(H5E_VFL, H5E_READERROR, FAIL, "driver read request failed")
}
}
done:
/* undo the base addr offset to the addrs array if necessary */
if ( addrs_cooked ) {
HDassert(file->base_addr > 0);
for ( i = 0; i < count; i++ ) {
addrs[i] -= file->base_addr;
}
}
FUNC_LEAVE_NOAPI(ret_value)
} /* end H5FD_read_vector() */
/*-------------------------------------------------------------------------
* Function: H5FD_write_vector
*
* Purpose: Private version of H5FDwrite_vector()
*
* Perform count writes to the specified file at the offsets
* provided in the addrs array, with the lengths and memory
* types provided in the sizes and types arrays. Data written
* is taken from the buffers provided in the bufs array.
*
* If the underlying VFD supports vector writes, pass the
* call through directly.
*
* If it doesn't, convert the vector write into a sequence
* of individual writes.
*
* Note that it is not in general possible to convert a
* vector write into a selection write, because each element
* in the vector read may have a different memory type.
* In contrast, selection writes are of a single type.
*
* Return: Success: SUCCEED
* All writes have completed successfully.
*
* Failure: FAIL
* One or more writes failed.
*
* Programmer: JRM -- 6/10/20
*
* Changes: None
*
*-------------------------------------------------------------------------
*/
herr_t
H5FD_write_vector(H5FD_t *file, uint32_t count, H5FD_mem_t types[],
haddr_t addrs[], size_t sizes[], void * bufs[] /* out */)
{
hbool_t addrs_cooked = FALSE;
uint32_t i;
hid_t dxpl_id; /* DXPL for operation */
haddr_t eoa = HADDR_UNDEF; /* EOA for file */
herr_t ret_value = SUCCEED; /* Return value */
FUNC_ENTER_NOAPI(FAIL)
/* Sanity checks */
HDassert(file);
HDassert(file->cls);
HDassert((types) || (count == 0));
HDassert((addrs) || (count == 0));
HDassert((sizes) || (count == 0));
HDassert((bufs) || (count == 0));
/* Get proper DXPL for I/O */
dxpl_id = H5CX_get_dxpl();
#ifndef H5_HAVE_PARALLEL
/* The no-op case
*
* Do not return early for Parallel mode since the I/O could be a
* collective transfer.
*/
if(0 == count)
HGOTO_DONE(SUCCEED)
#endif /* H5_HAVE_PARALLEL */
if ( file->base_addr > 0 ) {
/* apply the base_addr offset to the addrs array. Must undo before
* we return.
*/
for ( i = 0; i < count; i++ ) {
addrs[i] += file->base_addr;
}
addrs_cooked = TRUE;
}
for ( i = 0; i < count; i++ ) {
if(HADDR_UNDEF == (eoa = (file->cls->get_eoa)(file, types[i])))
HGOTO_ERROR(H5E_VFL, H5E_CANTINIT, FAIL, "driver get_eoa request failed")
if((addrs[i] + sizes[i]) > eoa)
HGOTO_ERROR(H5E_ARGS, H5E_OVERFLOW, FAIL, "addr overflow, addrs[%d] = %llu, sizes[%d] = %llu, eoa = %llu", (int)i, (unsigned long long)(addrs[i]), (int)i, (unsigned long long)sizes[i], (unsigned long long)eoa)
}
/* if the underlying VFD supports vector write, make the call */
if (file->cls->write_vector) {
if ((file->cls->write_vector)(file, dxpl_id, count, types, addrs, sizes, bufs) < 0)
HGOTO_ERROR(H5E_VFL, H5E_READERROR, FAIL, "driver write vector request failed")
} else {
/* otherwise, implement the vector write as a sequence of regular
* write calls.
*/
for ( i = 0; i < count; i++ ) {
if((file->cls->write)(file, types[i], dxpl_id, addrs[i], sizes[i], bufs[i]) < 0)
HGOTO_ERROR(H5E_VFL, H5E_READERROR, FAIL, "driver write request failed")
}
}
done:
/* undo the base addr offset to the addrs array if necessary */
if ( addrs_cooked ) {
HDassert(file->base_addr > 0);
for ( i = 0; i < count; i++ ) {
addrs[i] -= file->base_addr;
}
}
FUNC_LEAVE_NOAPI(ret_value)
} /* end H5FD_write() */
/*-------------------------------------------------------------------------
* Function: H5FD_set_eoa

View File

@@ -211,6 +211,8 @@ static const H5FD_class_t H5FD_log_g = {
H5FD_log_get_handle, /*get_handle */
H5FD_log_read, /*read */
H5FD_log_write, /*write */
NULL, /*read_vector */
NULL, /*write_vector */
NULL, /*flush */
H5FD_log_truncate, /*truncate */
H5FD_log_lock, /*lock */

View File

@@ -193,6 +193,8 @@ static const H5FD_class_t H5FD_mirror_g = {
NULL, /* get_handle */
H5FD_mirror_read, /* read */
H5FD_mirror_write, /* write */
NULL, /* read_vector */
NULL, /* write_vector */
NULL, /* flush */
H5FD_mirror_truncate, /* truncate */
H5FD_mirror_lock, /* lock */

View File

@@ -123,6 +123,8 @@ static const H5FD_class_mpi_t H5FD_mpio_g = {
H5FD__mpio_get_handle, /*get_handle */
H5FD__mpio_read, /*read */
H5FD__mpio_write, /*write */
NULL, /*read_vector */
NULL, /*write_vector */
H5FD__mpio_flush, /*flush */
H5FD__mpio_truncate, /*truncate */
NULL, /*lock */

View File

@@ -166,6 +166,8 @@ static const H5FD_class_t H5FD_multi_g = {
H5FD_multi_get_handle, /*get_handle */
H5FD_multi_read, /*read */
H5FD_multi_write, /*write */
NULL, /*read_vector */
NULL, /*write_vector */
H5FD_multi_flush, /*flush */
H5FD_multi_truncate, /*truncate */
H5FD_multi_lock, /*lock */

View File

@@ -137,6 +137,12 @@ H5_DLL herr_t H5FD_set_feature_flags(H5FD_t *file, unsigned long feature_flags);
H5_DLL herr_t H5FD_get_fs_type_map(const H5FD_t *file, H5FD_mem_t *type_map);
H5_DLL herr_t H5FD_read(H5FD_t *file, H5FD_mem_t type, haddr_t addr, size_t size, void *buf/*out*/);
H5_DLL herr_t H5FD_write(H5FD_t *file, H5FD_mem_t type, haddr_t addr, size_t size, const void *buf);
H5_DLL herr_t H5FD_read_vector(H5FD_t *file, uint32_t count,
H5FD_mem_t types[], haddr_t addrs[], size_t sizes[],
void * bufs[] /* out */);
H5_DLL herr_t H5FD_write_vector(H5FD_t *file, uint32_t count,
H5FD_mem_t types[], haddr_t addrs[], size_t sizes[],
void * bufs[] /* out */);
H5_DLL herr_t H5FD_flush(H5FD_t *file, hbool_t closing);
H5_DLL herr_t H5FD_truncate(H5FD_t *file, hbool_t closing);
H5_DLL herr_t H5FD_lock(H5FD_t *file, hbool_t rw);

View File

@@ -298,6 +298,12 @@ typedef struct H5FD_class_t {
haddr_t addr, size_t size, void *buffer);
herr_t (*write)(H5FD_t *file, H5FD_mem_t type, hid_t dxpl,
haddr_t addr, size_t size, const void *buffer);
herr_t (*read_vector)(H5FD_t *file, hid_t dxpl, uint32_t count,
H5FD_mem_t types[], haddr_t addrs[],
size_t sizes[], void * bufs[]);
herr_t (*write_vector)(H5FD_t *file, hid_t dxpl, uint32_t count,
H5FD_mem_t types[], haddr_t addrs[],
size_t sizes[], void * bufs[]);
herr_t (*flush)(H5FD_t *file, hid_t dxpl_id, hbool_t closing);
herr_t (*truncate)(H5FD_t *file, hid_t dxpl_id, hbool_t closing);
herr_t (*lock)(H5FD_t *file, hbool_t rw);
@@ -381,6 +387,12 @@ H5_DLL herr_t H5FDread(H5FD_t *file, H5FD_mem_t type, hid_t dxpl_id,
haddr_t addr, size_t size, void *buf/*out*/);
H5_DLL herr_t H5FDwrite(H5FD_t *file, H5FD_mem_t type, hid_t dxpl_id,
haddr_t addr, size_t size, const void *buf);
H5_DLL herr_t H5FDread_vector(H5FD_t *file, hid_t dxpl_id, uint32_t count,
H5FD_mem_t types[], haddr_t addrs[],
size_t sizes[], void * bufs[] /* out */);
H5_DLL herr_t H5FDwrite_vector(H5FD_t *file, hid_t dxpl_id, uint32_t count,
H5FD_mem_t types[], haddr_t addrs[],
size_t sizes[], void * bufs[] /* in */);
H5_DLL herr_t H5FDflush(H5FD_t *file, hid_t dxpl_id, hbool_t closing);
H5_DLL herr_t H5FDtruncate(H5FD_t *file, hid_t dxpl_id, hbool_t closing);
H5_DLL herr_t H5FDlock(H5FD_t *file, hbool_t rw);

View File

@@ -268,6 +268,8 @@ static const H5FD_class_t H5FD_ros3_g = {
H5FD_ros3_get_handle, /* get_handle */
H5FD_ros3_read, /* read */
H5FD_ros3_write, /* write */
NULL, /* read_vector */
NULL, /* write_vector */
NULL, /* flush */
H5FD_ros3_truncate, /* truncate */
H5FD_ros3_lock, /* lock */

View File

@@ -167,6 +167,8 @@ static const H5FD_class_t H5FD_sec2_g = {
H5FD_sec2_get_handle, /* get_handle */
H5FD_sec2_read, /* read */
H5FD_sec2_write, /* write */
NULL, /* read_vector */
NULL, /* write_vector */
NULL, /* flush */
H5FD_sec2_truncate, /* truncate */
H5FD_sec2_lock, /* lock */

View File

@@ -160,6 +160,8 @@ static const H5FD_class_t H5FD_splitter_g = {
H5FD_splitter_get_handle, /* get_handle */
H5FD_splitter_read, /* read */
H5FD_splitter_write, /* write */
NULL, /* read_vector */
NULL, /* write_vector */
H5FD_splitter_flush, /* flush */
H5FD_splitter_truncate, /* truncate */
H5FD_splitter_lock, /* lock */

View File

@@ -205,6 +205,8 @@ static const H5FD_class_t H5FD_stdio_g = {
H5FD_stdio_get_handle, /* get_handle */
H5FD_stdio_read, /* read */
H5FD_stdio_write, /* write */
NULL, /* read_vector */
NULL, /* write_vector */
H5FD_stdio_flush, /* flush */
H5FD_stdio_truncate, /* truncate */
H5FD_stdio_lock, /* lock */

273
src/H5FDsubfile.c Normal file
View File

@@ -0,0 +1,273 @@
#include "H5FDsubfile_public.h"
#ifdef H5_HAVE_PARALLEL
/***********/
/* Headers */
/***********/
#include "H5private.h" /* Generic Functions */
#include "H5CXprivate.h" /* API Contexts */
#include "H5Dprivate.h" /* Datasets */
#include "H5Eprivate.h" /* Error handling */
#include "H5Ipublic.h" /* IDs */
#include "H5Iprivate.h" /* IDs */
#include "H5MMprivate.h" /* Memory management */
#include "H5Pprivate.h" /* Property lists */
/*
=========================================
Private functions
========================================
*/
static size_t sf_topology_limit = 4;
static size_t sf_topology_entries = 0;
static sf_topology_t **sf_topology_cache = NULL;
static size_t sf_context_limit = 4;
static size_t sf_context_entries = 0;
static subfiling_context_t **sf_context_cache = NULL;
static hid_t context_id = H5I_INVALID_HID;
static hid_t topology_id = H5I_INVALID_HID;
static int64_t record_subfiling_object(SF_OBJ_TYPE type, void *obj)
{
size_t index;
int64_t obj_reference;
uint64_t tag;
switch(type) {
case SF_TOPOLOGY: {
if (sf_topology_cache == NULL) {
sf_topology_cache = (sf_topology_t **)
calloc(sf_topology_limit, sizeof(sf_topology_t *));
}
assert(sf_topology_cache != NULL);
index = sf_topology_entries++;
tag = SF_TOPOLOGY;
obj_reference = (int64_t)((tag << 32) | index);
sf_topology_cache[index] = obj;
return obj_reference;
break;
}
case SF_CONTEXT: {
if (sf_context_cache == NULL) {
sf_context_cache = (subfiling_context_t **)
calloc(sf_context_limit, sizeof(subfiling_context_t *));
}
assert(sf_context_cache != NULL);
index = sf_context_entries++;
tag = SF_CONTEXT;
obj_reference = (int64_t)((tag << 32) | index);
sf_context_cache[index] = (subfiling_context_t *)obj;
return obj_reference;
break;
}
default:
puts("UNKNOWN Subfiling object type");
}
return -1;
}
/*
=========================================
Public vars (for subfiling) and functions
========================================
*/
int sf_verbose_flag = 0;
/*
=========================================
File functions
=========================================
The pread and pwrite posix functions are described as
being thread safe. We include mutex locks and unlocks
to work around any potential threading conflicts...
Those however, are compiled according #ifdef
*/
int sf_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank)
{
int ret = 0;
ssize_t bytes_read;
ssize_t bytes_remaining = (ssize_t)data_size;
char *this_buffer = data_buffer;
while(bytes_remaining) {
if ((bytes_read = (ssize_t)pread(fd, this_buffer, (size_t)bytes_remaining, file_offset)) < 0) {
perror("pread failed!");
fflush(stdout);
}
else if (bytes_read > 0) {
if (sf_verbose_flag) {
printf("[ioc(%d) %s] read %ld bytes of %ld requested\n",
subfile_rank, __func__,
bytes_read, bytes_remaining);
}
bytes_remaining -= bytes_read;
this_buffer += bytes_read;
file_offset += bytes_read;
}
else {
printf("[ioc(%d) %s] ERROR! read of 0 bytes == eof!\n", subfile_rank, __func__ );
fflush(stdout);
break;
}
}
return ret;
}
int sf_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank)
{
int ret = 0;
char *this_data = (char *)data_buffer;
ssize_t bytes_remaining = (ssize_t) data_size;
ssize_t written = 0;
while(bytes_remaining) {
if ((written = pwrite(fd, this_data, (size_t)bytes_remaining, file_offset)) < 0) {
perror("pwrite failed!");
fflush(stdout);
break;
}
else {
if (sf_verbose_flag) {
printf("[ioc(%d) %s] wrote %ld bytes of %ld requested\n",
subfile_rank, __func__,
written, bytes_remaining);
}
bytes_remaining -= written;
this_data += written;
file_offset += written;
}
}
#ifdef SUBFILE_REQUIRE_FLUSH
fdatasync(fd);
#endif
return ret;
}
void * get_subfiling_object(int64_t object_id)
{
int obj_type = (int)((object_id >> 32) & 0x0FFFF);
/* We don't require a large indexing space
* 16 bits should be enough..
*/
size_t index = (object_id & 0x0FFFF);
if (obj_type == SF_TOPOLOGY) {
if (index < sf_context_entries) {
return (void *)sf_topology_cache[index];
}
else {
puts("Illegal object index");
}
}
else if (obj_type == SF_CONTEXT) {
if (index < sf_context_entries) {
return (void *)sf_context_cache[index];
}
else {
puts("Illegal object index");
}
}
else {
puts("UNKNOWN Subfiling object type");
}
return NULL;
}
herr_t
H5FDsubfiling_init(void)
{
herr_t ret_value = SUCCEED;
int ioc_count;
int world_rank, world_size;
sf_topology_t *thisApp = NULL;
subfiling_context_t *newContext = NULL;
FUNC_ENTER_API(FAIL)
H5TRACE0("e","");
if (MPI_Comm_size(MPI_COMM_WORLD, &world_size) != MPI_SUCCESS) {
puts("MPI_Comm_size returned an error");
ret_value = FAIL;
goto done;
}
if (MPI_Comm_rank(MPI_COMM_WORLD, &world_rank) != MPI_SUCCESS) {
puts("MPI_Comm_rank returned an error");
ret_value = FAIL;
goto done;
}
if ((ioc_count = H5FD__determine_ioc_count (world_size, world_rank, &thisApp)) > 0) {
topology_id = (hid_t)record_subfiling_object(SF_TOPOLOGY, thisApp);
}
if (topology_id < 0) {
puts("Unable to register subfiling topology!");
ret_value = FAIL;
goto done;
}
if (H5FD__init_subfile_context(&newContext, ioc_count, world_size, world_rank, thisApp->rank_is_ioc) != SUCCEED) {
puts("Unable to initialize a subfiling context!");
ret_value = FAIL;
goto done;
}
context_id = (hid_t)record_subfiling_object(SF_CONTEXT, newContext);
if (context_id < 0) {
ret_value = FAIL;
puts("Unable to register subfiling context!");
}
done:
FUNC_LEAVE_API(ret_value)
return ret_value;
}
herr_t
H5FDsubfiling_finalize(void)
{
herr_t ret_value = SUCCEED; /* Return value */
sf_topology_t *thisApp = NULL;
FUNC_ENTER_API(FAIL)
H5TRACE0("e","");
/* Shutdown the IO Concentrator threads */
if (topology_id != H5I_INVALID_HID) {
thisApp = get_subfiling_object(topology_id);
}
if (thisApp && thisApp->rank_is_ioc) {
begin_thread_exclusive();
sf_shutdown_flag = 1;
end_thread_exclusive();
usleep(100);
wait_for_thread_main();
}
MPI_Barrier(MPI_COMM_WORLD);
delete_subfiling_context(context_id);
FUNC_LEAVE_API(ret_value)
done:
return ret_value;
}
hid_t
get_subfiling_context(void)
{
return context_id;
}
#endif /* H5_HAVE_PARALLEL */

0
src/H5FDsubfile.h Normal file
View File

1642
src/H5FDsubfile_mpi.c Normal file

File diff suppressed because it is too large Load Diff

192
src/H5FDsubfile_private.h Normal file
View File

@@ -0,0 +1,192 @@
/********************/
/* Standard Headers */
/********************/
#include <assert.h>
#include <stdatomic.h>
#include <stdio.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
/**************/
/* H5 Headers */
/**************/
#include "H5private.h" /* Generic Functions */
#include "H5CXprivate.h" /* API Contexts */
#include "H5Dprivate.h" /* Datasets */
#include "H5Eprivate.h" /* Error handling */
#include "H5Ipublic.h"
#include "H5Iprivate.h" /* IDs */
#include "H5MMprivate.h" /* Memory management */
#include "H5Pprivate.h" /* Property lists */
#include "mpi.h"
#ifndef _H5FDsubfile_private_H
#define _H5FDsubfile_private_H
typedef int (*file_close_cb)(int,MPI_Comm);
typedef struct {
int64_t sf_stripe_size;
int64_t sf_blocksize_per_stripe;
MPI_Comm sf_msg_comm;
MPI_Comm sf_data_comm;
MPI_Comm sf_group_comm;
MPI_Comm sf_intercomm;
int sf_group_size;
int sf_group_rank;
int sf_intercomm_root;
char *subfile_prefix;
} subfiling_context_t;
typedef struct {
/* {Datasize, Offset} */
int64_t header[2];
int tag;
int source;
int subfile_rank;
} sf_work_request_t;
typedef struct {
long rank;
long hostid;
} layout_t;
typedef struct {
long hostid;
layout_t *topology;
int *node_ranks;
int node_count;
int node_index;
int local_peers;
int subfile_rank;
int world_rank;
int world_size;
bool rank_is_ioc;
} sf_topology_t;
#define K(n) ((n)*1024)
#define DEFAULT_STRIPE_SIZE K(256) /* (1024*1024) */
#define MAX_DEPTH 256
typedef enum io_ops {
READ_OP = 1,
WRITE_OP = 2,
OPEN_OP = 3,
CLOSE_OP = 4,
INCR_OP = 8,
DECR_OP = 16,
} io_op_t;
typedef enum {
SF_BADID = (-1),
SF_TOPOLOGY = 1,
SF_CONTEXT,
SF_NTYPES /* number of subfiling object types, MUST BE LAST */
} SF_OBJ_TYPE;
/* MPI Tags are 32 bits, we treat them as unsigned
* to allow the use of the available bits for RPC
* selections:
* 0000
* 0001 READ_OP (Independent)
* 0010 WRITE_OP (Independent)
* 0011 /////////
* 0100 CLOSE_OP (Independent)
* -----
* 1000
* 1001 COLLECTIVE_READ
* 1010 COLLECTIVE_WRITE
* 1011 /////////
* 1100 COLLECTIVE_CLOSE
*
* 31 28 24 20 16 12 8 4 0|
* +-------+-------+-------+-------+-------+-------+-------+-------+
* | | | ACKS | OP |
* +-------+-------+-------+-------+-------+-------+-------+-------+
*
*/
/* Bit 3 SET indicates collectives */
#define COLL_FUNC (0x1 << 3)
#define ACK_PART (0x0acc << 8)
#define DATA_PART (0xd8da << 8)
#define READY (0xfeed << 8)
#define COMPLETED (0xfed1 << 8)
#define READ_INDEP (READ_OP)
#define READ_COLL (COLL_FUNC | READ_OP)
#define WRITE_INDEP (WRITE_OP)
#define WRITE_COLL (COLL_FUNC | WRITE_OP)
#define WRITE_INDEP_ACK (ACK_PART | WRITE_OP)
#define WRITE_INDEP_DATA (DATA_PART | WRITE_OP)
#define READ_INDEP_DATA (DATA_PART | READ_OP)
#define INT32_MASK 0x07FFFFFFFFFFFFFFF
extern int sf_verbose_flag;
extern int sf_shutdown_flag;
extern atomic_int sf_workinprogress;
extern atomic_int sf_work_pending;
extern atomic_int sf_file_close_count;
extern atomic_int sf_file_refcount;
/*
-------------
Messages IN
-------------
*/
extern MPI_Comm sf_msg_comm;
/*
-------------
Messages OUT
-------------
*/
extern MPI_Comm sf_data_comm;
H5_DLL int H5FD__determine_ioc_count(int world_size, int world_rank, sf_topology_t **thisapp);
H5_DLL int H5FD__init_subfile_context(subfiling_context_t **newContext, int n_iocs, int world_size, int world_rank, bool rank_is_ioc);
H5_DLL void * get_subfiling_object(int64_t object_id);
H5_DLL hid_t get_subfiling_context(void);
H5_DLL int initialize_ioc_threads(subfiling_context_t *sf_context);
H5_DLL int tpool_add_work(sf_work_request_t *);
H5_DLL bool tpool_is_empty(void);
H5_DLL int ioc_main(subfiling_context_t *context);
H5_DLL int queue_write_coll(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm);
H5_DLL int queue_read_coll(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm);
H5_DLL int queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm);
H5_DLL int queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm);
H5_DLL int subfiling_close_file(int subfile_rank, MPI_Comm comm);
H5_DLL int subfiling_open_file(const char *prefix, int subfile_rank, MPI_Comm comm);
H5_DLL int queue_file_open(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm);
H5_DLL int decrement_file_ref_counts( int subfile_rank, int source, MPI_Comm comm, file_close_cb callback_ftn);
H5_DLL int sf_open_subfiles(hid_t context_id, char *prefix, int flags);
H5_DLL int sf_close_subfiles(hid_t context_id);
H5_DLL int sf_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank);
H5_DLL int sf_read_independent(hid_t context_id, int64_t offset, int64_t elements, int dtype_extent, void *data);
H5_DLL int sf_write_independent(hid_t context_id, int64_t offset, int64_t elements, int dtype_extent, void *data);
H5_DLL int sf_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank);
H5_DLL void delete_subfiling_context(hid_t context_id);
H5_DLL void finalize_ioc_threads(void);
H5_DLL int begin_thread_exclusive(void);
H5_DLL int end_thread_exclusive(void);
H5_DLL int wait_for_thread_main(void);
H5_DLL int finalize_subfile_close(void);
#endif

11
src/H5FDsubfile_public.h Normal file
View File

@@ -0,0 +1,11 @@
#ifndef _H5FDsubfile_public_H
#define _H5FDsubfile_public_H
#include "H5FDsubfile_private.h"
herr_t H5FDsubfiling_init(void);
herr_t H5FDsubfiling_finalize(void);
#endif /* _H5FDsubfile_public_H */

161
src/H5FDsubfile_threads.c Normal file
View File

@@ -0,0 +1,161 @@
#include "H5FDsubfile_private.h"
#include "mercury/mercury_util_config.h"
#include "mercury/mercury_log.h"
#include "mercury/mercury_log.c"
#include "mercury/mercury_util_error.c"
#include "mercury/mercury_thread.c"
#include "mercury/mercury_thread_mutex.c"
#include "mercury/mercury_thread_condition.h"
#include "mercury/mercury_thread_condition.c"
#include "mercury/mercury_thread_pool.c"
#include "mercury/mercury_thread_spin.c"
static hg_thread_mutex_t ioc_mutex = PTHREAD_MUTEX_INITIALIZER;
static hg_thread_mutex_t ioc_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
static hg_thread_pool_t *ioc_thread_pool = NULL;
static hg_thread_t ioc_thread;
#ifndef HG_TEST_NUM_THREADS_DEFAULT
#define HG_TEST_NUM_THREADS_DEFAULT 4
#endif
#define POOL_CONCURRENT_MAX 64
static struct hg_thread_work pool_request[POOL_CONCURRENT_MAX];
static HG_THREAD_RETURN_TYPE
ioc_thread_main(void *arg)
{
hg_thread_ret_t thread_ret = (hg_thread_ret_t) 0;
/* Pass along the subfiling_context_t */
ioc_main(arg);
// hg_thread_exit(thread_ret);
return thread_ret;
}
int
initialize_ioc_threads(subfiling_context_t *sf_context)
{
int status;
status = hg_thread_mutex_init(&ioc_mutex);
if (status) {
puts("hg_thread_mutex_init failed");
goto err_exit;
}
status = hg_thread_mutex_init(&ioc_thread_mutex);
if (status) {
puts("hg_thread_mutex_init failed");
goto err_exit;
}
status = hg_thread_pool_init(HG_TEST_NUM_THREADS_DEFAULT, &ioc_thread_pool);
if (status) {
puts("hg_thread_pool_init failed");
goto err_exit;
}
status = hg_thread_create(&ioc_thread, ioc_thread_main, sf_context);
if (status) {
puts("hg_thread_create failed");
goto err_exit;
}
return 0;
err_exit:
return -1;
}
void __attribute__((destructor)) finalize_ioc_threads(void)
{
if (ioc_thread_pool != NULL) {
hg_thread_pool_destroy(ioc_thread_pool);
ioc_thread_pool = NULL;
}
}
static HG_THREAD_RETURN_TYPE
handle_work_request(void *arg)
{
hg_thread_ret_t ret = 0;
sf_work_request_t *msg = (sf_work_request_t *)arg;
int status = 0;
atomic_fetch_add(&sf_work_pending, 1); // atomic
switch(msg->tag) {
case WRITE_COLL:
status = queue_write_coll( msg, msg->subfile_rank, msg->source, sf_data_comm);
break;
case READ_COLL:
status = queue_read_coll( msg, msg->subfile_rank, msg->source, sf_data_comm);
break;
case WRITE_INDEP:
status = queue_write_indep( msg, msg->subfile_rank, msg->source, sf_data_comm);
break;
case READ_INDEP:
status = queue_read_indep( msg, msg->subfile_rank, msg->source, sf_data_comm);
break;
case CLOSE_OP:
status = decrement_file_ref_counts( msg->subfile_rank, msg->source, sf_data_comm,
subfiling_close_file);
break;
case OPEN_OP:
status = queue_file_open( msg, msg->subfile_rank, msg->source, sf_data_comm);
break;
default:
printf("[ioc(%d)] received message tag(%x)from rank %d\n", msg->subfile_rank, msg->tag, msg->source);
status = -1;
break;
}
atomic_fetch_sub(&sf_work_pending, 1); // atomic
if (status < 0) {
printf("[ioc(%d) %s]: Error encounted processing request(%x) from rank(%d\n",
msg->subfile_rank, __func__, msg->tag, msg->source);
fflush(stdout);
}
return ret;
}
int tpool_add_work(sf_work_request_t *work)
{
static int work_index = 0;
hg_thread_mutex_lock(&ioc_mutex);
if (work_index == POOL_CONCURRENT_MAX)
work_index = 0;
pool_request[work_index].func = handle_work_request;
pool_request[work_index].args = work;
hg_thread_pool_post(ioc_thread_pool, &pool_request[work_index++]);
hg_thread_mutex_unlock(&ioc_mutex);
return 0;
}
bool tpool_is_empty(void)
{
return HG_QUEUE_IS_EMPTY(&ioc_thread_pool->queue);
}
int begin_thread_exclusive(void)
{
return hg_thread_mutex_lock(&ioc_thread_mutex);
}
int end_thread_exclusive(void)
{
return hg_thread_mutex_unlock(&ioc_thread_mutex);
}
int wait_for_thread_main(void)
{
if (hg_thread_join(ioc_thread) == 0)
puts("thread_join succeeded");
else {
puts("thread_join failed");
return -1;
}
return 0;
}

1314
src/H5FDsubfiling.c Normal file

File diff suppressed because it is too large Load Diff

78
src/H5FDsubfiling.h Normal file
View File

@@ -0,0 +1,78 @@
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
* Copyright by The HDF Group. *
* Copyright by the Board of Trustees of the University of Illinois. *
* All rights reserved. *
* *
* This file is part of HDF5. The full HDF5 copyright notice, including *
* terms governing use, modification, and redistribution, is contained in *
* the COPYING file, which can be found at the root of the source code *
* distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases. *
* If you do not have access to either file, you may request a copy from *
* help@hdfgroup.org. *
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
/*
* Programmer: Robb Matzke <matzke@llnl.gov>
* Monday, August 2, 1999
*
* Purpose: The public header file for the subfiling driver.
*/
#ifndef H5FDsubfiling_H
#define H5FDsubfiling_H
#define H5FD_SUBFILING (H5FD_subfiling_init())
/****************************************************************************
*
* Structure: H5FD_subfiling_fapl_t
*
* Purpose:
*
* H5FD_subfiling_fapl_t is a public structure that is used to pass
* subfiling configuration data to the appropriate subfiling VFD via
* the FAPL. A pointer to an instance of this structure is a parameter
* to H5Pset_fapl_subfiling() and H5Pget_fapl_subfiling().
*
* `version` (int32_t)
*
* Version number of the H5FD_subfiling_fapl_t structure. Any instance
* passed to the above calls must have a recognized version number, or
* an error will be flagged.
*
* This field should be set to H5FD_CURR_SUBFILING_FAPL_T_VERSION.
*
*
* Add fields needed to configure the subfiling VFD here.
*
* Note that we have to be able to copy FAPL entries -- thus use of
* variable size fields (i.e. pointers to strings, etc) will complicate
* matters.
*
****************************************************************************/
#define H5FD_CURR_SUBFILING_FAPL_T_VERSION 1
typedef struct H5FD_subfiling_fapl_t {
int32_t version;
/* add configuration fields here */
} H5FD_subfiling_fapl_t;
#ifdef __cplusplus
extern "C" {
#endif
H5_DLL hid_t H5FD_subfiling_init(void);
H5_DLL herr_t H5Pget_fapl_subfiling(hid_t fapl_id,
H5FD_subfiling_fapl_t *fa_out);
H5_DLL herr_t H5Pset_fapl_subfiling(hid_t fapl_id, H5FD_subfiling_fapl_t *fa);
#ifdef __cplusplus
}
#endif
#endif

View File

@@ -63,7 +63,7 @@ libhdf5_la_SOURCES= H5.c H5checksum.c H5dbg.c H5lib_settings.c H5system.c \
H5FAint.c H5FAstat.c H5FAtest.c \
H5FD.c H5FDcore.c H5FDfamily.c H5FDhdfs.c H5FDint.c H5FDlog.c \
H5FDmulti.c H5FDsec2.c H5FDspace.c \
H5FDsplitter.c H5FDstdio.c H5FDtest.c \
H5FDsplitter.c H5FDstdio.c H5FDsubfiling.c H5FDtest.c \
H5FL.c H5FO.c H5FS.c H5FScache.c H5FSdbg.c H5FSint.c H5FSsection.c \
H5FSstat.c H5FStest.c \
H5G.c H5Gbtree2.c H5Gcache.c H5Gcompact.c H5Gdense.c H5Gdeprec.c \
@@ -118,7 +118,8 @@ libhdf5_la_SOURCES= H5.c H5checksum.c H5dbg.c H5lib_settings.c H5system.c \
# Only compile parallel sources if necessary
if BUILD_PARALLEL_CONDITIONAL
libhdf5_la_SOURCES += H5mpi.c H5ACmpio.c H5Cmpio.c H5Dmpio.c H5Fmpi.c H5FDmpi.c H5FDmpio.c H5Smpio.c
libhdf5_la_SOURCES += H5mpi.c H5ACmpio.c H5Cmpio.c H5Dmpio.c H5Fmpi.c H5FDmpi.c H5FDmpio.c H5Smpio.c \
H5FDsubfile.c H5FDsubfile_threads.c H5FDsubfile_mpi.c
endif
# Only compile the direct VFD if necessary
@@ -143,14 +144,15 @@ include_HEADERS = hdf5.h H5api_adpt.h H5overflow.h H5pubconf.h H5public.h H5vers
H5Epubgen.h H5Epublic.h H5ESpublic.h H5Fpublic.h \
H5FDpublic.h H5FDcore.h H5FDdirect.h H5FDfamily.h H5FDhdfs.h \
H5FDlog.h H5FDmirror.h H5FDmpi.h H5FDmpio.h H5FDmulti.h H5FDros3.h \
H5FDsec2.h H5FDsplitter.h H5FDstdio.h H5FDwindows.h \
H5FDsec2.h H5FDsplitter.h H5FDstdio.h H5FDsubfiling.h H5FDwindows.h \
H5Gpublic.h H5Ipublic.h H5Lpublic.h \
H5Mpublic.h H5MMpublic.h H5Opublic.h H5Ppublic.h \
H5PLextern.h H5PLpublic.h \
H5Rpublic.h H5Spublic.h H5Tpublic.h \
H5VLconnector.h H5VLconnector_passthru.h \
H5VLnative.h H5VLpassthru.h H5VLpublic.h \
H5Zpublic.h
H5Zpublic.h H5FDsubfile_public.h H5FDsubfile_private.h \
mercury/mercury_thread.h mercury/mercury_thread_mutex.h mercury/mercury_log.h
# install libhdf5.settings in lib directory
settingsdir=$(libdir)

View File

@@ -51,6 +51,7 @@
#include "H5FDmulti.h" /* Usage-partitioned file family */
#include "H5FDros3.h" /* R/O S3 "file" I/O */
#include "H5FDsec2.h" /* POSIX unbuffered file I/O */
#include "H5FDsubfiling.h" /* subfiling */
#include "H5FDsplitter.h" /* Twin-channel (R/W & R/O) I/O passthrough */
#include "H5FDstdio.h" /* Standard C buffered I/O */
#ifdef H5_HAVE_WINDOWS

View File

@@ -0,0 +1,637 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#ifndef MERCURY_ATOMIC_H
#define MERCURY_ATOMIC_H
#include "mercury_util_config.h"
#if defined(_WIN32)
# include <windows.h>
typedef struct {
volatile LONG value;
} hg_atomic_int32_t;
typedef struct {
volatile LONGLONG value;
} hg_atomic_int64_t;
# define HG_ATOMIC_VAR_INIT(x) \
{ \
(x) \
}
#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
# include <opa_primitives.h>
typedef OPA_int_t hg_atomic_int32_t;
typedef OPA_ptr_t hg_atomic_int64_t; /* OPA has only limited 64-bit support */
# define HG_ATOMIC_VAR_INIT(x) OPA_PTR_T_INITIALIZER(x)
#elif defined(HG_UTIL_HAS_STDATOMIC_H)
# include <stdatomic.h>
typedef atomic_int hg_atomic_int32_t;
# if HG_UTIL_ATOMIC_LONG_WIDTH == 8
typedef atomic_long hg_atomic_int64_t;
# else
typedef atomic_llong hg_atomic_int64_t;
# endif
# define HG_ATOMIC_VAR_INIT(x) ATOMIC_VAR_INIT(x)
#elif defined(__APPLE__)
# include <libkern/OSAtomic.h>
typedef struct {
volatile hg_util_int32_t value;
} hg_atomic_int32_t;
typedef struct {
volatile hg_util_int64_t value;
} hg_atomic_int64_t;
# define HG_ATOMIC_VAR_INIT(x) \
{ \
(x) \
}
#else
# error "Not supported on this platform."
#endif
#ifdef __cplusplus
extern "C" {
#endif
/**
* Init atomic value (32-bit integer).
*
* \param ptr [OUT] pointer to an atomic32 integer
* \param value [IN] value
*/
static HG_UTIL_INLINE void
hg_atomic_init32(hg_atomic_int32_t *ptr, hg_util_int32_t value);
/**
* Set atomic value (32-bit integer).
*
* \param ptr [OUT] pointer to an atomic32 integer
* \param value [IN] value
*/
static HG_UTIL_INLINE void
hg_atomic_set32(hg_atomic_int32_t *ptr, hg_util_int32_t value);
/**
* Get atomic value (32-bit integer).
*
* \param ptr [OUT] pointer to an atomic32 integer
*
* \return Value of the atomic integer
*/
static HG_UTIL_INLINE hg_util_int32_t
hg_atomic_get32(hg_atomic_int32_t *ptr);
/**
* Increment atomic value (32-bit integer).
*
* \param ptr [IN/OUT] pointer to an atomic32 integer
*
* \return Incremented value
*/
static HG_UTIL_INLINE hg_util_int32_t
hg_atomic_incr32(hg_atomic_int32_t *ptr);
/**
* Decrement atomic value (32-bit integer).
*
* \param ptr [IN/OUT] pointer to an atomic32 integer
*
* \return Decremented value
*/
static HG_UTIL_INLINE hg_util_int32_t
hg_atomic_decr32(hg_atomic_int32_t *ptr);
/**
* OR atomic value (32-bit integer).
*
* \param ptr [IN/OUT] pointer to an atomic32 integer
* \param value [IN] value to OR with
*
* \return Original value
*/
static HG_UTIL_INLINE hg_util_int32_t
hg_atomic_or32(hg_atomic_int32_t *ptr, hg_util_int32_t value);
/**
* XOR atomic value (32-bit integer).
*
* \param ptr [IN/OUT] pointer to an atomic32 integer
* \param value [IN] value to XOR with
*
* \return Original value
*/
static HG_UTIL_INLINE hg_util_int32_t
hg_atomic_xor32(hg_atomic_int32_t *ptr, hg_util_int32_t value);
/**
* AND atomic value (32-bit integer).
*
* \param ptr [IN/OUT] pointer to an atomic32 integer
* \param value [IN] value to AND with
*
* \return Original value
*/
static HG_UTIL_INLINE hg_util_int32_t
hg_atomic_and32(hg_atomic_int32_t *ptr, hg_util_int32_t value);
/**
* Compare and swap values (32-bit integer).
*
* \param ptr [IN/OUT] pointer to an atomic32 integer
* \param compare_value [IN] value to compare to
* \param swap_value [IN] value to swap with if ptr value is equal to
* compare value
*
* \return HG_UTIL_TRUE if swapped or HG_UTIL_FALSE
*/
static HG_UTIL_INLINE hg_util_bool_t
hg_atomic_cas32(hg_atomic_int32_t *ptr, hg_util_int32_t compare_value,
hg_util_int32_t swap_value);
/**
* Init atomic value (64-bit integer).
*
* \param ptr [OUT] pointer to an atomic32 integer
* \param value [IN] value
*/
static HG_UTIL_INLINE void
hg_atomic_init64(hg_atomic_int64_t *ptr, hg_util_int64_t value);
/**
* Set atomic value (64-bit integer).
*
* \param ptr [OUT] pointer to an atomic64 integer
* \param value [IN] value
*/
static HG_UTIL_INLINE void
hg_atomic_set64(hg_atomic_int64_t *ptr, hg_util_int64_t value);
/**
* Get atomic value (64-bit integer).
*
* \param ptr [OUT] pointer to an atomic64 integer
*
* \return Value of the atomic integer
*/
static HG_UTIL_INLINE hg_util_int64_t
hg_atomic_get64(hg_atomic_int64_t *ptr);
/**
* Increment atomic value (64-bit integer).
*
* \param ptr [IN/OUT] pointer to an atomic64 integer
*
* \return Incremented value
*/
static HG_UTIL_INLINE hg_util_int64_t
hg_atomic_incr64(hg_atomic_int64_t *ptr);
/**
* Decrement atomic value (64-bit integer).
*
* \param ptr [IN/OUT] pointer to an atomic64 integer
*
* \return Decremented value
*/
static HG_UTIL_INLINE hg_util_int64_t
hg_atomic_decr64(hg_atomic_int64_t *ptr);
/**
* OR atomic value (64-bit integer).
*
* \param ptr [IN/OUT] pointer to an atomic64 integer
* \param value [IN] value to OR with
*
* \return Original value
*/
static HG_UTIL_INLINE hg_util_int64_t
hg_atomic_or64(hg_atomic_int64_t *ptr, hg_util_int64_t value);
/**
* XOR atomic value (64-bit integer).
*
* \param ptr [IN/OUT] pointer to an atomic64 integer
* \param value [IN] value to XOR with
*
* \return Original value
*/
static HG_UTIL_INLINE hg_util_int64_t
hg_atomic_xor64(hg_atomic_int64_t *ptr, hg_util_int64_t value);
/**
* AND atomic value (64-bit integer).
*
* \param ptr [IN/OUT] pointer to an atomic64 integer
* \param value [IN] value to AND with
*
* \return Original value
*/
static HG_UTIL_INLINE hg_util_int64_t
hg_atomic_and64(hg_atomic_int64_t *ptr, hg_util_int64_t value);
/**
* Compare and swap values (64-bit integer).
*
* \param ptr [IN/OUT] pointer to an atomic64 integer
* \param compare_value [IN] value to compare to
* \param swap_value [IN] value to swap with if ptr value is equal to
* compare value
*
* \return HG_UTIL_TRUE if swapped or HG_UTIL_FALSE
*/
static HG_UTIL_INLINE hg_util_bool_t
hg_atomic_cas64(hg_atomic_int64_t *ptr, hg_util_int64_t compare_value,
hg_util_int64_t swap_value);
/**
* Memory barrier.
*
*/
static HG_UTIL_INLINE void
hg_atomic_fence(void);
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE void
hg_atomic_init32(hg_atomic_int32_t *ptr, hg_util_int32_t value)
{
#if defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
atomic_init(ptr, value);
#else
hg_atomic_set32(ptr, value);
#endif
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE void
hg_atomic_set32(hg_atomic_int32_t *ptr, hg_util_int32_t value)
{
#if defined(_WIN32)
ptr->value = value;
#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
OPA_store_int(ptr, value);
#elif defined(HG_UTIL_HAS_STDATOMIC_H)
atomic_store_explicit(ptr, value, memory_order_release);
#elif defined(__APPLE__)
ptr->value = value;
#else
# error "Not supported on this platform."
#endif
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_util_int32_t
hg_atomic_get32(hg_atomic_int32_t *ptr)
{
hg_util_int32_t ret;
#if defined(_WIN32)
ret = ptr->value;
#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
ret = OPA_load_int(ptr);
#elif defined(HG_UTIL_HAS_STDATOMIC_H)
ret = atomic_load_explicit(ptr, memory_order_acquire);
#elif defined(__APPLE__)
ret = ptr->value;
#else
# error "Not supported on this platform."
#endif
return ret;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_util_int32_t
hg_atomic_incr32(hg_atomic_int32_t *ptr)
{
hg_util_int32_t ret;
#if defined(_WIN32)
ret = InterlockedIncrementNoFence(&ptr->value);
#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
ret = OPA_fetch_and_incr_int(ptr) + 1;
#elif defined(HG_UTIL_HAS_STDATOMIC_H)
ret = atomic_fetch_add_explicit(ptr, 1, memory_order_acq_rel) + 1;
#elif defined(__APPLE__)
ret = OSAtomicIncrement32(&ptr->value);
#else
# error "Not supported on this platform."
#endif
return ret;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_util_int32_t
hg_atomic_decr32(hg_atomic_int32_t *ptr)
{
hg_util_int32_t ret;
#if defined(_WIN32)
ret = InterlockedDecrementNoFence(&ptr->value);
#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
ret = OPA_fetch_and_decr_int(ptr) - 1;
#elif defined(HG_UTIL_HAS_STDATOMIC_H)
ret = atomic_fetch_sub_explicit(ptr, 1, memory_order_acq_rel) - 1;
#elif defined(__APPLE__)
ret = OSAtomicDecrement32(&ptr->value);
#else
# error "Not supported on this platform."
#endif
return ret;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_util_int32_t
hg_atomic_or32(hg_atomic_int32_t *ptr, hg_util_int32_t value)
{
hg_util_int32_t ret;
#if defined(_WIN32)
ret = InterlockedOrNoFence(&ptr->value, value);
#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
ret = atomic_fetch_or_explicit(ptr, value, memory_order_acq_rel);
#elif defined(__APPLE__)
ret = OSAtomicOr32Orig((uint32_t) value, (volatile uint32_t *) &ptr->value);
#else
do {
ret = hg_atomic_get32(ptr);
} while (!hg_atomic_cas32(ptr, ret, (ret | value)));
#endif
return ret;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_util_int32_t
hg_atomic_xor32(hg_atomic_int32_t *ptr, hg_util_int32_t value)
{
hg_util_int32_t ret;
#if defined(_WIN32)
ret = InterlockedXorNoFence(&ptr->value, value);
#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
ret = atomic_fetch_xor_explicit(ptr, value, memory_order_acq_rel);
#elif defined(__APPLE__)
ret =
OSAtomicXor32Orig((uint32_t) value, (volatile uint32_t *) &ptr->value);
#else
do {
ret = hg_atomic_get32(ptr);
} while (!hg_atomic_cas32(ptr, ret, (ret ^ value)));
#endif
return ret;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_util_int32_t
hg_atomic_and32(hg_atomic_int32_t *ptr, hg_util_int32_t value)
{
hg_util_int32_t ret;
#if defined(_WIN32)
ret = InterlockedAndNoFence(&ptr->value, value);
#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
ret = atomic_fetch_and_explicit(ptr, value, memory_order_acq_rel);
#elif defined(__APPLE__)
ret =
OSAtomicAnd32Orig((uint32_t) value, (volatile uint32_t *) &ptr->value);
#else
do {
ret = hg_atomic_get32(ptr);
} while (!hg_atomic_cas32(ptr, ret, (ret & value)));
#endif
return ret;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_util_bool_t
hg_atomic_cas32(hg_atomic_int32_t *ptr, hg_util_int32_t compare_value,
hg_util_int32_t swap_value)
{
hg_util_bool_t ret;
#if defined(_WIN32)
ret = (compare_value == InterlockedCompareExchangeNoFence(
&ptr->value, swap_value, compare_value));
#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
ret = (hg_util_bool_t)(
compare_value == OPA_cas_int(ptr, compare_value, swap_value));
#elif defined(HG_UTIL_HAS_STDATOMIC_H)
ret = atomic_compare_exchange_strong_explicit(ptr, &compare_value,
swap_value, memory_order_acq_rel, memory_order_acquire);
#elif defined(__APPLE__)
ret = OSAtomicCompareAndSwap32(compare_value, swap_value, &ptr->value);
#else
# error "Not supported on this platform."
#endif
return ret;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE void
hg_atomic_init64(hg_atomic_int64_t *ptr, hg_util_int64_t value)
{
#if defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
atomic_init(ptr, value);
#else
hg_atomic_set64(ptr, value);
#endif
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE void
hg_atomic_set64(hg_atomic_int64_t *ptr, hg_util_int64_t value)
{
#if defined(_WIN32)
ptr->value = value;
#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
OPA_store_ptr(ptr, (void *) value);
#elif defined(HG_UTIL_HAS_STDATOMIC_H)
atomic_store_explicit(ptr, value, memory_order_release);
#elif defined(__APPLE__)
ptr->value = value;
#else
# error "Not supported on this platform."
#endif
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_util_int64_t
hg_atomic_get64(hg_atomic_int64_t *ptr)
{
hg_util_int64_t ret;
#if defined(_WIN32)
ret = ptr->value;
#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
ret = (hg_util_int64_t) OPA_load_ptr(ptr);
#elif defined(HG_UTIL_HAS_STDATOMIC_H)
ret = atomic_load_explicit(ptr, memory_order_acquire);
#elif defined(__APPLE__)
ptr->value = value;
#else
# error "Not supported on this platform."
#endif
return ret;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_util_int64_t
hg_atomic_incr64(hg_atomic_int64_t *ptr)
{
hg_util_int64_t ret;
#if defined(_WIN32)
ret = InterlockedIncrementNoFence64(&ptr->value);
#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
ret = atomic_fetch_add_explicit(ptr, 1, memory_order_acq_rel) + 1;
#elif defined(__APPLE__)
ret = OSAtomicIncrement64(&ptr->value);
#else
do {
ret = hg_atomic_get64(ptr);
} while (!hg_atomic_cas64(ptr, ret, ret + 1));
ret++;
#endif
return ret;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_util_int64_t
hg_atomic_decr64(hg_atomic_int64_t *ptr)
{
hg_util_int64_t ret;
#if defined(_WIN32)
ret = InterlockedDecrementNoFence64(&ptr->value);
#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
ret = atomic_fetch_sub_explicit(ptr, 1, memory_order_acq_rel) - 1;
#elif defined(__APPLE__)
ret = OSAtomicDecrement64(&ptr->value);
#else
do {
ret = hg_atomic_get64(ptr);
} while (!hg_atomic_cas64(ptr, ret, ret - 1));
ret--;
#endif
return ret;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_util_int64_t
hg_atomic_or64(hg_atomic_int64_t *ptr, hg_util_int64_t value)
{
hg_util_int64_t ret;
#if defined(_WIN32)
ret = InterlockedOr64NoFence(&ptr->value, value);
#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
ret = atomic_fetch_or_explicit(ptr, value, memory_order_acq_rel);
#else
do {
ret = hg_atomic_get64(ptr);
} while (!hg_atomic_cas64(ptr, ret, (ret | value)));
#endif
return ret;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_util_int64_t
hg_atomic_xor64(hg_atomic_int64_t *ptr, hg_util_int64_t value)
{
hg_util_int64_t ret;
#if defined(_WIN32)
ret = InterlockedXor64NoFence(&ptr->value, value);
#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
ret = atomic_fetch_xor_explicit(ptr, value, memory_order_acq_rel);
#else
do {
ret = hg_atomic_get64(ptr);
} while (!hg_atomic_cas64(ptr, ret, (ret ^ value)));
#endif
return ret;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_util_int64_t
hg_atomic_and64(hg_atomic_int64_t *ptr, hg_util_int64_t value)
{
hg_util_int64_t ret;
#if defined(_WIN32)
ret = InterlockedAnd64NoFence(&ptr->value, value);
#elif defined(HG_UTIL_HAS_STDATOMIC_H) && !defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
ret = atomic_fetch_and_explicit(ptr, value, memory_order_acq_rel);
#else
do {
ret = hg_atomic_get64(ptr);
} while (!hg_atomic_cas64(ptr, ret, (ret & value)));
#endif
return ret;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_util_bool_t
hg_atomic_cas64(hg_atomic_int64_t *ptr, hg_util_int64_t compare_value,
hg_util_int64_t swap_value)
{
hg_util_bool_t ret;
#if defined(_WIN32)
ret = (compare_value == InterlockedCompareExchangeNoFence64(
&ptr->value, swap_value, compare_value));
#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
ret = (hg_util_bool_t)(
compare_value == (hg_util_int64_t) OPA_cas_ptr(
ptr, (void *) compare_value, (void *) swap_value));
#elif defined(HG_UTIL_HAS_STDATOMIC_H)
ret = atomic_compare_exchange_strong_explicit(ptr, &compare_value,
swap_value, memory_order_acq_rel, memory_order_acquire);
#elif defined(__APPLE__)
ret = OSAtomicCompareAndSwap64(compare_value, swap_value, &ptr->value);
#else
# error "Not supported on this platform."
#endif
return ret;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE void
hg_atomic_fence()
{
#if defined(_WIN32)
MemoryBarrier();
#elif defined(HG_UTIL_HAS_OPA_PRIMITIVES_H)
OPA_read_write_barrier();
#elif defined(HG_UTIL_HAS_STDATOMIC_H)
atomic_thread_fence(memory_order_acq_rel);
#elif defined(__APPLE__)
OSMemoryBarrier();
#else
# error "Not supported on this platform."
#endif
}
#ifdef __cplusplus
}
#endif
#endif /* MERCURY_ATOMIC_H */

View File

@@ -0,0 +1,83 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
/* Implementation derived from:
* https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h
*
* -
* Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include "mercury_atomic_queue.h"
#include "mercury_util_error.h"
#include <stdlib.h>
/****************/
/* Local Macros */
/****************/
/* From <sys/param.h> */
#define powerof2(x) ((((x) -1) & (x)) == 0)
/*---------------------------------------------------------------------------*/
struct hg_atomic_queue *
hg_atomic_queue_alloc(unsigned int count)
{
struct hg_atomic_queue *hg_atomic_queue = NULL;
HG_UTIL_CHECK_ERROR_NORET(
!powerof2(count), done, "atomic queue size must be power of 2");
hg_atomic_queue = malloc(
sizeof(struct hg_atomic_queue) + count * sizeof(hg_atomic_int64_t));
HG_UTIL_CHECK_ERROR_NORET(
hg_atomic_queue == NULL, done, "Could not allocate atomic queue");
hg_atomic_queue->prod_size = hg_atomic_queue->cons_size = count;
hg_atomic_queue->prod_mask = hg_atomic_queue->cons_mask = count - 1;
hg_atomic_init32(&hg_atomic_queue->prod_head, 0);
hg_atomic_init32(&hg_atomic_queue->cons_head, 0);
hg_atomic_init32(&hg_atomic_queue->prod_tail, 0);
hg_atomic_init32(&hg_atomic_queue->cons_tail, 0);
done:
return hg_atomic_queue;
}
/*---------------------------------------------------------------------------*/
void
hg_atomic_queue_free(struct hg_atomic_queue *hg_atomic_queue)
{
free(hg_atomic_queue);
}

View File

@@ -0,0 +1,271 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
/* Implementation derived from:
* https://github.com/freebsd/freebsd/blob/master/sys/sys/buf_ring.h
*
* -
* Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#ifndef MERCURY_ATOMIC_QUEUE_H
#define MERCURY_ATOMIC_QUEUE_H
#include "mercury_atomic.h"
#include "mercury_mem.h"
/*************************************/
/* Public Type and Struct Definition */
/*************************************/
struct hg_atomic_queue {
hg_atomic_int32_t prod_head;
hg_atomic_int32_t prod_tail;
unsigned int prod_size;
unsigned int prod_mask;
hg_util_uint64_t drops;
hg_atomic_int32_t cons_head
__attribute__((aligned(HG_MEM_CACHE_LINE_SIZE)));
hg_atomic_int32_t cons_tail;
unsigned int cons_size;
unsigned int cons_mask;
hg_atomic_int64_t ring[] __attribute__((aligned(HG_MEM_CACHE_LINE_SIZE)));
};
/*****************/
/* Public Macros */
/*****************/
#ifndef cpu_spinwait
# if defined(__x86_64__) || defined(__amd64__)
# define cpu_spinwait() asm volatile("pause\n" : : : "memory");
# else
# define cpu_spinwait() ;
# endif
#endif
/*********************/
/* Public Prototypes */
/*********************/
#ifdef __cplusplus
extern "C" {
#endif
/**
* Allocate a new queue that can hold \count elements.
*
* \param count [IN] maximum number of elements
*
* \return pointer to allocated queue or NULL on failure
*/
HG_UTIL_PUBLIC struct hg_atomic_queue *
hg_atomic_queue_alloc(unsigned int count);
/**
* Free an existing queue.
*
* \param hg_atomic_queue [IN] pointer to queue
*/
HG_UTIL_PUBLIC void
hg_atomic_queue_free(struct hg_atomic_queue *hg_atomic_queue);
/**
* Push an entry to the queue.
*
* \param hg_atomic_queue [IN/OUT] pointer to queue
* \param entry [IN] pointer to object
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_atomic_queue_push(struct hg_atomic_queue *hg_atomic_queue, void *entry);
/**
* Pop an entry from the queue (multi-consumer).
*
* \param hg_atomic_queue [IN/OUT] pointer to queue
*
* \return Pointer to popped object or NULL if queue is empty
*/
static HG_UTIL_INLINE void *
hg_atomic_queue_pop_mc(struct hg_atomic_queue *hg_atomic_queue);
/**
* Pop an entry from the queue (single consumer).
*
* \param hg_atomic_queue [IN/OUT] pointer to queue
*
* \return Pointer to popped object or NULL if queue is empty
*/
static HG_UTIL_INLINE void *
hg_atomic_queue_pop_sc(struct hg_atomic_queue *hg_atomic_queue);
/**
* Determine whether queue is empty.
*
* \param hg_atomic_queue [IN/OUT] pointer to queue
*
* \return HG_UTIL_TRUE if empty, HG_UTIL_FALSE if not
*/
static HG_UTIL_INLINE hg_util_bool_t
hg_atomic_queue_is_empty(struct hg_atomic_queue *hg_atomic_queue);
/**
* Determine number of entries in a queue.
*
* \param hg_atomic_queue [IN/OUT] pointer to queue
*
* \return Number of entries queued or 0 if none
*/
static HG_UTIL_INLINE unsigned int
hg_atomic_queue_count(struct hg_atomic_queue *hg_atomic_queue);
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_atomic_queue_push(struct hg_atomic_queue *hg_atomic_queue, void *entry)
{
hg_util_int32_t prod_head, prod_next, cons_tail;
do {
prod_head = hg_atomic_get32(&hg_atomic_queue->prod_head);
prod_next = (prod_head + 1) & (int) hg_atomic_queue->prod_mask;
cons_tail = hg_atomic_get32(&hg_atomic_queue->cons_tail);
if (prod_next == cons_tail) {
hg_atomic_fence();
if (prod_head == hg_atomic_get32(&hg_atomic_queue->prod_head) &&
cons_tail == hg_atomic_get32(&hg_atomic_queue->cons_tail)) {
hg_atomic_queue->drops++;
/* Full */
return HG_UTIL_FAIL;
}
continue;
}
} while (
!hg_atomic_cas32(&hg_atomic_queue->prod_head, prod_head, prod_next));
hg_atomic_set64(&hg_atomic_queue->ring[prod_head], (hg_util_int64_t) entry);
/*
* If there are other enqueues in progress
* that preceded us, we need to wait for them
* to complete
*/
while (hg_atomic_get32(&hg_atomic_queue->prod_tail) != prod_head)
cpu_spinwait();
hg_atomic_set32(&hg_atomic_queue->prod_tail, prod_next);
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE void *
hg_atomic_queue_pop_mc(struct hg_atomic_queue *hg_atomic_queue)
{
hg_util_int32_t cons_head, cons_next;
void *entry = NULL;
do {
cons_head = hg_atomic_get32(&hg_atomic_queue->cons_head);
cons_next = (cons_head + 1) & (int) hg_atomic_queue->cons_mask;
if (cons_head == hg_atomic_get32(&hg_atomic_queue->prod_tail))
return NULL;
} while (
!hg_atomic_cas32(&hg_atomic_queue->cons_head, cons_head, cons_next));
entry = (void *) hg_atomic_get64(&hg_atomic_queue->ring[cons_head]);
/*
* If there are other dequeues in progress
* that preceded us, we need to wait for them
* to complete
*/
while (hg_atomic_get32(&hg_atomic_queue->cons_tail) != cons_head)
cpu_spinwait();
hg_atomic_set32(&hg_atomic_queue->cons_tail, cons_next);
return entry;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE void *
hg_atomic_queue_pop_sc(struct hg_atomic_queue *hg_atomic_queue)
{
hg_util_int32_t cons_head, cons_next;
hg_util_int32_t prod_tail;
void *entry = NULL;
cons_head = hg_atomic_get32(&hg_atomic_queue->cons_head);
prod_tail = hg_atomic_get32(&hg_atomic_queue->prod_tail);
cons_next = (cons_head + 1) & (int) hg_atomic_queue->cons_mask;
if (cons_head == prod_tail)
/* Empty */
return NULL;
hg_atomic_set32(&hg_atomic_queue->cons_head, cons_next);
entry = (void *) hg_atomic_get64(&hg_atomic_queue->ring[cons_head]);
hg_atomic_set32(&hg_atomic_queue->cons_tail, cons_next);
return entry;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_util_bool_t
hg_atomic_queue_is_empty(struct hg_atomic_queue *hg_atomic_queue)
{
return (hg_atomic_get32(&hg_atomic_queue->cons_head) ==
hg_atomic_get32(&hg_atomic_queue->prod_tail));
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE unsigned int
hg_atomic_queue_count(struct hg_atomic_queue *hg_atomic_queue)
{
return ((hg_atomic_queue->prod_size +
(unsigned int) hg_atomic_get32(&hg_atomic_queue->prod_tail) -
(unsigned int) hg_atomic_get32(&hg_atomic_queue->cons_tail)) &
hg_atomic_queue->prod_mask);
}
#ifdef __cplusplus
}
#endif
#endif /* MERCURY_ATOMIC_QUEUE_H */

View File

@@ -0,0 +1,72 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#include "mercury_event.h"
#include "mercury_util_error.h"
/*---------------------------------------------------------------------------*/
int
hg_event_create(void)
{
int fd = -1;
#if defined(_WIN32)
#elif defined(HG_UTIL_HAS_SYSEVENTFD_H)
/* Create local signal event on self address */
fd = eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE);
HG_UTIL_CHECK_ERROR_NORET(
fd == -1, done, "eventfd() failed (%s)", strerror(errno));
#elif defined(HG_UTIL_HAS_SYSEVENT_H)
struct kevent kev;
struct timespec timeout = {0, 0};
int rc;
/* Create kqueue */
fd = kqueue();
HG_UTIL_CHECK_ERROR_NORET(
fd == -1, done, "kqueue() failed (%s)", strerror(errno));
EV_SET(&kev, HG_EVENT_IDENT, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, NULL);
/* Add user-defined event to kqueue */
rc = kevent(fd, &kev, 1, NULL, 0, &timeout);
HG_UTIL_CHECK_ERROR_NORET(
rc == -1, error, "kevent() failed (%s)", strerror(errno));
#else
#endif
done:
return fd;
#if defined(HG_UTIL_HAS_SYSEVENT_H)
error:
hg_event_destroy(fd);
return -1;
#endif
}
/*---------------------------------------------------------------------------*/
int
hg_event_destroy(int fd)
{
int ret = HG_UTIL_SUCCESS, rc;
#if defined(_WIN32)
#else
rc = close(fd);
HG_UTIL_CHECK_ERROR(rc == -1, done, ret, HG_UTIL_FAIL,
"close() failed (%s)", strerror(errno));
#endif
done:
return ret;
}

184
src/mercury/mercury_event.h Normal file
View File

@@ -0,0 +1,184 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#ifndef MERCURY_EVENT_H
#define MERCURY_EVENT_H
#include "mercury_util_config.h"
#ifdef _WIN32
#else
# include <errno.h>
# include <string.h>
# include <unistd.h>
# if defined(HG_UTIL_HAS_SYSEVENTFD_H)
# include <sys/eventfd.h>
# ifndef HG_UTIL_HAS_EVENTFD_T
typedef uint64_t eventfd_t;
# endif
# elif defined(HG_UTIL_HAS_SYSEVENT_H)
# include <sys/event.h>
# define HG_EVENT_IDENT 42 /* User-defined ident */
# endif
#endif
/**
* Purpose: define an event object that can be used as an event
* wait/notify mechanism.
*/
#ifdef __cplusplus
extern "C" {
#endif
/**
* Create a new event object.
*
* \return file descriptor on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_event_create(void);
/**
* Destroy an event object.
*
* \param fd [IN] event file descriptor
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_event_destroy(int fd);
/**
* Notify for event.
*
* \param fd [IN] event file descriptor
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_event_set(int fd);
/**
* Get event notification.
*
* \param fd [IN] event file descriptor
* \param notified [IN] boolean set to HG_UTIL_TRUE if event received
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_event_get(int fd, hg_util_bool_t *notified);
/*---------------------------------------------------------------------------*/
#if defined(_WIN32)
/* TODO */
#elif defined(HG_UTIL_HAS_SYSEVENTFD_H)
# ifdef HG_UTIL_HAS_EVENTFD_T
static HG_UTIL_INLINE int
hg_event_set(int fd)
{
return (eventfd_write(fd, 1) == 0) ? HG_UTIL_SUCCESS : HG_UTIL_FAIL;
}
# else
static HG_UTIL_INLINE int
hg_event_set(int fd)
{
eventfd_t count = 1;
ssize_t s = write(fd, &count, sizeof(eventfd_t));
return (s == sizeof(eventfd_t)) ? HG_UTIL_SUCCESS : HG_UTIL_FAIL;
}
# endif
#elif defined(HG_UTIL_HAS_SYSEVENT_H)
static HG_UTIL_INLINE int
hg_event_set(int fd)
{
struct kevent kev;
struct timespec timeout = {0, 0};
int rc;
EV_SET(&kev, HG_EVENT_IDENT, EVFILT_USER, 0, NOTE_TRIGGER, 0, NULL);
/* Trigger user-defined event */
rc = kevent(fd, &kev, 1, NULL, 0, &timeout);
return (rc == -1) ? HG_UTIL_FAIL : HG_UTIL_SUCCESS;
}
#else
# error "Not supported on this platform."
#endif
/*---------------------------------------------------------------------------*/
#if defined(_WIN32)
#elif defined(HG_UTIL_HAS_SYSEVENTFD_H)
# ifdef HG_UTIL_HAS_EVENTFD_T
static HG_UTIL_INLINE int
hg_event_get(int fd, hg_util_bool_t *signaled)
{
eventfd_t count = 0;
if ((eventfd_read(fd, &count) == 0) && count)
*signaled = HG_UTIL_TRUE;
else {
if (errno == EAGAIN)
*signaled = HG_UTIL_FALSE;
else
return HG_UTIL_FAIL;
}
return HG_UTIL_SUCCESS;
}
# else
static HG_UTIL_INLINE int
hg_event_get(int fd, hg_util_bool_t *signaled)
{
eventfd_t count = 0;
ssize_t s = read(fd, &count, sizeof(eventfd_t));
if ((s == sizeof(eventfd_t)) && count)
*signaled = HG_UTIL_TRUE;
else {
if (errno == EAGAIN)
*signaled = HG_UTIL_FALSE;
else
return HG_UTIL_FAIL;
}
return HG_UTIL_SUCCESS;
}
# endif
#elif defined(HG_UTIL_HAS_SYSEVENT_H)
static HG_UTIL_INLINE int
hg_event_get(int fd, hg_util_bool_t *signaled)
{
struct kevent kev;
int nfds;
struct timespec timeout = {0, 0};
/* Check user-defined event */
nfds = kevent(fd, NULL, 0, &kev, 1, &timeout);
if (nfds == -1)
return HG_UTIL_FAIL;
*signaled = ((nfds > 0) && (kev.ident == HG_EVENT_IDENT)) ? HG_UTIL_TRUE
: HG_UTIL_FALSE;
return HG_UTIL_SUCCESS;
}
#else
# error "Not supported on this platform."
#endif
#ifdef __cplusplus
}
#endif
#endif /* MERCURY_EVENT_H */

View File

@@ -0,0 +1,48 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#ifndef MERCURY_HASH_STRING_H
#define MERCURY_HASH_STRING_H
#include "mercury_util_config.h"
#ifdef __cplusplus
extern "C" {
#endif
/**
* Hash function name for unique ID to register.
*
* \param string [IN] string name
*
* \return Non-negative ID that corresponds to string name
*/
static HG_UTIL_INLINE unsigned int
hg_hash_string(const char *string)
{
/* This is the djb2 string hash function */
unsigned int result = 5381;
const unsigned char *p;
p = (const unsigned char *) string;
while (*p != '\0') {
result = (result << 5) + result + *p;
++p;
}
return result;
}
#ifdef __cplusplus
}
#endif
#endif /* MERCURY_HASH_STRING_H */

View File

@@ -0,0 +1,526 @@
/*
Copyright (c) 2005-2008, Simon Howard
Permission to use, copy, modify, and/or distribute this software
for any purpose with or without fee is hereby granted, provided
that the above copyright notice and this permission notice appear
in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
/* Hash table implementation */
#include "mercury_hash_table.h"
#include <stdlib.h>
#include <string.h>
struct hg_hash_table_entry {
hg_hash_table_key_t key;
hg_hash_table_value_t value;
hg_hash_table_entry_t *next;
};
struct hg_hash_table {
hg_hash_table_entry_t **table;
unsigned int table_size;
hg_hash_table_hash_func_t hash_func;
hg_hash_table_equal_func_t equal_func;
hg_hash_table_key_free_func_t key_free_func;
hg_hash_table_value_free_func_t value_free_func;
unsigned int entries;
unsigned int prime_index;
};
/* This is a set of good hash table prime numbers, from:
* http://planetmath.org/goodhashtableprimes
* Each prime is roughly double the previous value, and as far as
* possible from the nearest powers of two. */
static const unsigned int hash_table_primes[] = {
193,
389,
769,
1543,
3079,
6151,
12289,
24593,
49157,
98317,
196613,
393241,
786433,
1572869,
3145739,
6291469,
12582917,
25165843,
50331653,
100663319,
201326611,
402653189,
805306457,
1610612741,
};
static const unsigned int hash_table_num_primes =
sizeof(hash_table_primes) / sizeof(int);
/* Internal function used to allocate the table on hash table creation
* and when enlarging the table */
static int
hash_table_allocate_table(hg_hash_table_t *hash_table)
{
unsigned int new_table_size;
/* Determine the table size based on the current prime index.
* An attempt is made here to ensure sensible behavior if the
* maximum prime is exceeded, but in practice other things are
* likely to break long before that happens. */
if (hash_table->prime_index < hash_table_num_primes) {
new_table_size = hash_table_primes[hash_table->prime_index];
} else {
new_table_size = hash_table->entries * 10;
}
hash_table->table_size = new_table_size;
/* Allocate the table and initialise to NULL for all entries */
hash_table->table = (hg_hash_table_entry_t **) calloc(
hash_table->table_size, sizeof(hg_hash_table_entry_t *));
return hash_table->table != NULL;
}
/* Free an entry, calling the free functions if there are any registered */
static void
hash_table_free_entry(hg_hash_table_t *hash_table, hg_hash_table_entry_t *entry)
{
/* If there is a function registered for freeing keys, use it to free
* the key */
if (hash_table->key_free_func != NULL) {
hash_table->key_free_func(entry->key);
}
/* Likewise with the value */
if (hash_table->value_free_func != NULL) {
hash_table->value_free_func(entry->value);
}
/* Free the data structure */
free(entry);
}
hg_hash_table_t *
hg_hash_table_new(
hg_hash_table_hash_func_t hash_func, hg_hash_table_equal_func_t equal_func)
{
hg_hash_table_t *hash_table;
/* Allocate a new hash table structure */
hash_table = (hg_hash_table_t *) malloc(sizeof(hg_hash_table_t));
if (hash_table == NULL) {
return NULL;
}
hash_table->hash_func = hash_func;
hash_table->equal_func = equal_func;
hash_table->key_free_func = NULL;
hash_table->value_free_func = NULL;
hash_table->entries = 0;
hash_table->prime_index = 0;
/* Allocate the table */
if (!hash_table_allocate_table(hash_table)) {
free(hash_table);
return NULL;
}
return hash_table;
}
void
hg_hash_table_free(hg_hash_table_t *hash_table)
{
hg_hash_table_entry_t *rover;
hg_hash_table_entry_t *next;
unsigned int i;
/* Free all entries in all chains */
for (i = 0; i < hash_table->table_size; ++i) {
rover = hash_table->table[i];
while (rover != NULL) {
next = rover->next;
hash_table_free_entry(hash_table, rover);
rover = next;
}
}
/* Free the table */
free(hash_table->table);
/* Free the hash table structure */
free(hash_table);
}
void
hg_hash_table_register_free_functions(hg_hash_table_t *hash_table,
hg_hash_table_key_free_func_t key_free_func,
hg_hash_table_value_free_func_t value_free_func)
{
hash_table->key_free_func = key_free_func;
hash_table->value_free_func = value_free_func;
}
static int
hash_table_enlarge(hg_hash_table_t *hash_table)
{
hg_hash_table_entry_t **old_table;
unsigned int old_table_size;
unsigned int old_prime_index;
hg_hash_table_entry_t *rover;
hg_hash_table_entry_t *next;
unsigned int entry_index;
unsigned int i;
/* Store a copy of the old table */
old_table = hash_table->table;
old_table_size = hash_table->table_size;
old_prime_index = hash_table->prime_index;
/* Allocate a new, larger table */
++hash_table->prime_index;
if (!hash_table_allocate_table(hash_table)) {
/* Failed to allocate the new table */
hash_table->table = old_table;
hash_table->table_size = old_table_size;
hash_table->prime_index = old_prime_index;
return 0;
}
/* Link all entries from all chains into the new table */
for (i = 0; i < old_table_size; ++i) {
rover = old_table[i];
while (rover != NULL) {
next = rover->next;
/* Find the index into the new table */
entry_index =
hash_table->hash_func(rover->key) % hash_table->table_size;
/* Link this entry into the chain */
rover->next = hash_table->table[entry_index];
hash_table->table[entry_index] = rover;
/* Advance to next in the chain */
rover = next;
}
}
/* Free the old table */
free(old_table);
return 1;
}
int
hg_hash_table_insert(hg_hash_table_t *hash_table, hg_hash_table_key_t key,
hg_hash_table_value_t value)
{
hg_hash_table_entry_t *rover;
hg_hash_table_entry_t *newentry;
unsigned int entry_index;
/* If there are too many items in the table with respect to the table
* size, the number of hash collisions increases and performance
* decreases. Enlarge the table size to prevent this happening */
if ((hash_table->entries * 3) / hash_table->table_size > 0) {
/* Table is more than 1/3 full */
if (!hash_table_enlarge(hash_table)) {
/* Failed to enlarge the table */
return 0;
}
}
/* Generate the hash of the key and hence the index into the table */
entry_index = hash_table->hash_func(key) % hash_table->table_size;
/* Traverse the chain at this location and look for an existing
* entry with the same key */
rover = hash_table->table[entry_index];
while (rover != NULL) {
if (hash_table->equal_func(rover->key, key) != 0) {
/* Same key: overwrite this entry with new data */
/* If there is a value free function, free the old data
* before adding in the new data */
if (hash_table->value_free_func != NULL) {
hash_table->value_free_func(rover->value);
}
/* Same with the key: use the new key value and free
* the old one */
if (hash_table->key_free_func != NULL) {
hash_table->key_free_func(rover->key);
}
rover->key = key;
rover->value = value;
/* Finished */
return 1;
}
rover = rover->next;
}
/* Not in the hash table yet. Create a new entry */
newentry = (hg_hash_table_entry_t *) malloc(sizeof(hg_hash_table_entry_t));
if (newentry == NULL) {
return 0;
}
newentry->key = key;
newentry->value = value;
/* Link into the list */
newentry->next = hash_table->table[entry_index];
hash_table->table[entry_index] = newentry;
/* Maintain the count of the number of entries */
++hash_table->entries;
/* Added successfully */
return 1;
}
hg_hash_table_value_t
hg_hash_table_lookup(hg_hash_table_t *hash_table, hg_hash_table_key_t key)
{
hg_hash_table_entry_t *rover;
unsigned int entry_index;
/* Generate the hash of the key and hence the index into the table */
entry_index = hash_table->hash_func(key) % hash_table->table_size;
/* Walk the chain at this index until the corresponding entry is
* found */
rover = hash_table->table[entry_index];
while (rover != NULL) {
if (hash_table->equal_func(key, rover->key) != 0) {
/* Found the entry. Return the data. */
return rover->value;
}
rover = rover->next;
}
/* Not found */
return HG_HASH_TABLE_NULL;
}
int
hg_hash_table_remove(hg_hash_table_t *hash_table, hg_hash_table_key_t key)
{
hg_hash_table_entry_t **rover;
hg_hash_table_entry_t *entry;
unsigned int entry_index;
int result;
/* Generate the hash of the key and hence the index into the table */
entry_index = hash_table->hash_func(key) % hash_table->table_size;
/* Rover points at the pointer which points at the current entry
* in the chain being inspected. ie. the entry in the table, or
* the "next" pointer of the previous entry in the chain. This
* allows us to unlink the entry when we find it. */
result = 0;
rover = &hash_table->table[entry_index];
while (*rover != NULL) {
if (hash_table->equal_func(key, (*rover)->key) != 0) {
/* This is the entry to remove */
entry = *rover;
/* Unlink from the list */
*rover = entry->next;
/* Destroy the entry structure */
hash_table_free_entry(hash_table, entry);
/* Track count of entries */
--hash_table->entries;
result = 1;
break;
}
/* Advance to the next entry */
rover = &((*rover)->next);
}
return result;
}
unsigned int
hg_hash_table_num_entries(hg_hash_table_t *hash_table)
{
return hash_table->entries;
}
void
hg_hash_table_iterate(
hg_hash_table_t *hash_table, hg_hash_table_iter_t *iterator)
{
unsigned int chain;
iterator->hash_table = hash_table;
/* Default value of next if no entries are found. */
iterator->next_entry = NULL;
/* Find the first entry */
for (chain = 0; chain < hash_table->table_size; ++chain) {
if (hash_table->table[chain] != NULL) {
iterator->next_entry = hash_table->table[chain];
iterator->next_chain = chain;
break;
}
}
}
int
hg_hash_table_iter_has_more(hg_hash_table_iter_t *iterator)
{
return iterator->next_entry != NULL;
}
hg_hash_table_value_t
hg_hash_table_iter_next(hg_hash_table_iter_t *iterator)
{
hg_hash_table_entry_t *current_entry;
hg_hash_table_t *hash_table;
hg_hash_table_value_t result;
unsigned int chain;
hash_table = iterator->hash_table;
/* No more entries? */
if (iterator->next_entry == NULL) {
return HG_HASH_TABLE_NULL;
}
/* Result is immediately available */
current_entry = iterator->next_entry;
result = current_entry->value;
/* Find the next entry */
if (current_entry->next != NULL) {
/* Next entry in current chain */
iterator->next_entry = current_entry->next;
} else {
/* None left in this chain, so advance to the next chain */
chain = iterator->next_chain + 1;
/* Default value if no next chain found */
iterator->next_entry = NULL;
while (chain < hash_table->table_size) {
/* Is there anything in this chain? */
if (hash_table->table[chain] != NULL) {
iterator->next_entry = hash_table->table[chain];
break;
}
/* Try the next chain */
++chain;
}
iterator->next_chain = chain;
}
return result;
}

View File

@@ -0,0 +1,252 @@
/*
Copyright (c) 2005-2008, Simon Howard
Permission to use, copy, modify, and/or distribute this software
for any purpose with or without fee is hereby granted, provided
that the above copyright notice and this permission notice appear
in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
/**
* \file mercury_hash_table.h
*
* \brief Hash table.
*
* A hash table stores a set of values which can be addressed by a
* key. Given the key, the corresponding value can be looked up
* quickly.
*
* To create a hash table, use \ref hg_hash_table_new. To destroy a
* hash table, use \ref hg_hash_table_free.
*
* To insert a value into a hash table, use \ref hg_hash_table_insert.
*
* To remove a value from a hash table, use \ref hg_hash_table_remove.
*
* To look up a value by its key, use \ref hg_hash_table_lookup.
*
* To iterate over all values in a hash table, use
* \ref hg_hash_table_iterate to initialize a \ref hg_hash_table_iter
* structure. Each value can then be read in turn using
* \ref hg_hash_table_iter_next and \ref hg_hash_table_iter_has_more.
*/
#ifndef HG_HASH_TABLE_H
#define HG_HASH_TABLE_H
#include "mercury_util_config.h"
#ifdef __cplusplus
extern "C" {
#endif
/**
* A hash table structure.
*/
typedef struct hg_hash_table hg_hash_table_t;
/**
* Structure used to iterate over a hash table.
*/
typedef struct hg_hash_table_iter hg_hash_table_iter_t;
/**
* Internal structure representing an entry in a hash table.
*/
typedef struct hg_hash_table_entry hg_hash_table_entry_t;
/**
* A key to look up a value in a \ref hg_hash_table_t.
*/
typedef void *hg_hash_table_key_t;
/**
* A value stored in a \ref hg_hash_table_t.
*/
typedef void *hg_hash_table_value_t;
/**
* Definition of a \ref hg_hash_table_iter.
*/
struct hg_hash_table_iter {
hg_hash_table_t *hash_table;
hg_hash_table_entry_t *next_entry;
unsigned int next_chain;
};
/**
* A null \ref HashTableValue.
*/
#define HG_HASH_TABLE_NULL ((void *) 0)
/**
* Hash function used to generate hash values for keys used in a hash
* table.
*
* \param value The value to generate a hash value for.
* \return The hash value.
*/
typedef unsigned int (*hg_hash_table_hash_func_t)(hg_hash_table_key_t value);
/**
* Function used to compare two keys for equality.
*
* \return Non-zero if the two keys are equal, zero if the keys are
* not equal.
*/
typedef int (*hg_hash_table_equal_func_t)(
hg_hash_table_key_t value1, hg_hash_table_key_t value2);
/**
* Type of function used to free keys when entries are removed from a
* hash table.
*/
typedef void (*hg_hash_table_key_free_func_t)(hg_hash_table_key_t value);
/**
* Type of function used to free values when entries are removed from a
* hash table.
*/
typedef void (*hg_hash_table_value_free_func_t)(hg_hash_table_value_t value);
/**
* Create a new hash table.
*
* \param hash_func Function used to generate hash keys for the
* keys used in the table.
* \param equal_func Function used to test keys used in the table
* for equality.
* \return A new hash table structure, or NULL if it
* was not possible to allocate the new hash
* table.
*/
HG_UTIL_PUBLIC hg_hash_table_t *
hg_hash_table_new(
hg_hash_table_hash_func_t hash_func, hg_hash_table_equal_func_t equal_func);
/**
* Destroy a hash table.
*
* \param hash_table The hash table to destroy.
*/
HG_UTIL_PUBLIC void
hg_hash_table_free(hg_hash_table_t *hash_table);
/**
* Register functions used to free the key and value when an entry is
* removed from a hash table.
*
* \param hash_table The hash table.
* \param key_free_func Function used to free keys.
* \param value_free_func Function used to free values.
*/
HG_UTIL_PUBLIC void
hg_hash_table_register_free_functions(hg_hash_table_t *hash_table,
hg_hash_table_key_free_func_t key_free_func,
hg_hash_table_value_free_func_t value_free_func);
/**
* Insert a value into a hash table, overwriting any existing entry
* using the same key.
*
* \param hash_table The hash table.
* \param key The key for the new value.
* \param value The value to insert.
* \return Non-zero if the value was added successfully,
* or zero if it was not possible to allocate
* memory for the new entry.
*/
HG_UTIL_PUBLIC int
hg_hash_table_insert(hg_hash_table_t *hash_table, hg_hash_table_key_t key,
hg_hash_table_value_t value);
/**
* Look up a value in a hash table by key.
*
* \param hash_table The hash table.
* \param key The key of the value to look up.
* \return The value, or \ref HASH_TABLE_NULL if there
* is no value with that key in the hash table.
*/
HG_UTIL_PUBLIC hg_hash_table_value_t
hg_hash_table_lookup(hg_hash_table_t *hash_table, hg_hash_table_key_t key);
/**
* Remove a value from a hash table.
*
* \param hash_table The hash table.
* \param key The key of the value to remove.
* \return Non-zero if a key was removed, or zero if the
* specified key was not found in the hash table.
*/
HG_UTIL_PUBLIC int
hg_hash_table_remove(hg_hash_table_t *hash_table, hg_hash_table_key_t key);
/**
* Retrieve the number of entries in a hash table.
*
* \param hash_table The hash table.
* \return The number of entries in the hash table.
*/
HG_UTIL_PUBLIC unsigned int
hg_hash_table_num_entries(hg_hash_table_t *hash_table);
/**
* Initialise a \ref HashTableIterator to iterate over a hash table.
*
* \param hash_table The hash table.
* \param iter Pointer to an iterator structure to
* initialise.
*/
HG_UTIL_PUBLIC void
hg_hash_table_iterate(hg_hash_table_t *hash_table, hg_hash_table_iter_t *iter);
/**
* Determine if there are more keys in the hash table to iterate over.
*
* \param iterator The hash table iterator.
* \return Zero if there are no more values to iterate
* over, non-zero if there are more values to
* iterate over.
*/
HG_UTIL_PUBLIC int
hg_hash_table_iter_has_more(hg_hash_table_iter_t *iterator);
/**
* Using a hash table iterator, retrieve the next key.
*
* \param iterator The hash table iterator.
* \return The next key from the hash table, or
* \ref HG_HASH_TABLE_NULL if there are no more
* keys to iterate over.
*/
HG_UTIL_PUBLIC hg_hash_table_value_t
hg_hash_table_iter_next(hg_hash_table_iter_t *iterator);
#ifdef __cplusplus
}
#endif
#endif /* HG_HASH_TABLE_H */

126
src/mercury/mercury_list.h Normal file
View File

@@ -0,0 +1,126 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
/* Code below is derived from sys/queue.h which follows the below notice:
*
* Copyright (c) 1991, 1993
* The Regents of the University of California. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of the University nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* @(#)queue.h 8.5 (Berkeley) 8/20/94
*/
#ifndef MERCURY_LIST_H
#define MERCURY_LIST_H
#define HG_LIST_HEAD_INITIALIZER(name) \
{ \
NULL \
}
#define HG_LIST_HEAD_INIT(struct_head_name, var_name) \
struct struct_head_name var_name = HG_LIST_HEAD_INITIALIZER(var_name)
#define HG_LIST_HEAD_DECL(struct_head_name, struct_entry_name) \
struct struct_head_name { \
struct struct_entry_name *head; \
}
#define HG_LIST_HEAD(struct_entry_name) \
struct { \
struct struct_entry_name *head; \
}
#define HG_LIST_ENTRY(struct_entry_name) \
struct { \
struct struct_entry_name *next; \
struct struct_entry_name **prev; \
}
#define HG_LIST_INIT(head_ptr) \
do { \
(head_ptr)->head = NULL; \
} while (/*CONSTCOND*/ 0)
#define HG_LIST_IS_EMPTY(head_ptr) ((head_ptr)->head == NULL)
#define HG_LIST_FIRST(head_ptr) ((head_ptr)->head)
#define HG_LIST_NEXT(entry_ptr, entry_field_name) \
((entry_ptr)->entry_field_name.next)
#define HG_LIST_INSERT_AFTER(list_entry_ptr, entry_ptr, entry_field_name) \
do { \
if (((entry_ptr)->entry_field_name.next = \
(list_entry_ptr)->entry_field_name.next) != NULL) \
(list_entry_ptr)->entry_field_name.next->entry_field_name.prev = \
&(entry_ptr)->entry_field_name.next; \
(list_entry_ptr)->entry_field_name.next = (entry_ptr); \
(entry_ptr)->entry_field_name.prev = \
&(list_entry_ptr)->entry_field_name.next; \
} while (/*CONSTCOND*/ 0)
#define HG_LIST_INSERT_BEFORE(list_entry_ptr, entry_ptr, entry_field_name) \
do { \
(entry_ptr)->entry_field_name.prev = \
(list_entry_ptr)->entry_field_name.prev; \
(entry_ptr)->entry_field_name.next = (list_entry_ptr); \
*(list_entry_ptr)->entry_field_name.prev = (entry_ptr); \
(list_entry_ptr)->entry_field_name.prev = \
&(entry_ptr)->entry_field_name.next; \
} while (/*CONSTCOND*/ 0)
#define HG_LIST_INSERT_HEAD(head_ptr, entry_ptr, entry_field_name) \
do { \
if (((entry_ptr)->entry_field_name.next = (head_ptr)->head) != NULL) \
(head_ptr)->head->entry_field_name.prev = \
&(entry_ptr)->entry_field_name.next; \
(head_ptr)->head = (entry_ptr); \
(entry_ptr)->entry_field_name.prev = &(head_ptr)->head; \
} while (/*CONSTCOND*/ 0)
/* TODO would be nice to not have any condition */
#define HG_LIST_REMOVE(entry_ptr, entry_field_name) \
do { \
if ((entry_ptr)->entry_field_name.next != NULL) \
(entry_ptr)->entry_field_name.next->entry_field_name.prev = \
(entry_ptr)->entry_field_name.prev; \
*(entry_ptr)->entry_field_name.prev = \
(entry_ptr)->entry_field_name.next; \
} while (/*CONSTCOND*/ 0)
#define HG_LIST_FOREACH(var, head_ptr, entry_field_name) \
for ((var) = ((head_ptr)->head); (var); \
(var) = ((var)->entry_field_name.next))
#endif /* MERCURY_LIST_H */

128
src/mercury/mercury_log.c Normal file
View File

@@ -0,0 +1,128 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#include "mercury_log.h"
#include <stdarg.h>
/****************/
/* Local Macros */
/****************/
#define HG_LOG_MAX_BUF 256
#ifdef HG_UTIL_HAS_LOG_COLOR
# define HG_LOG_ESC "\033"
# define HG_LOG_RESET HG_LOG_ESC "[0m"
# define HG_LOG_REG HG_LOG_ESC "[0;"
# define HG_LOG_BOLD HG_LOG_ESC "[1;"
# define HG_LOG_RED "31m"
# define HG_LOG_GREEN "32m"
# define HG_LOG_YELLOW "33m"
# define HG_LOG_BLUE "34m"
# define HG_LOG_MAGENTA "35m"
# define HG_LOG_CYAN "36m"
#endif
/*******************/
/* Local Variables */
/*******************/
static int (*hg_log_func_g)(FILE *stream, const char *format, ...) = fprintf;
static FILE *hg_log_stream_debug_g = NULL;
static FILE *hg_log_stream_warning_g = NULL;
static FILE *hg_log_stream_error_g = NULL;
/*---------------------------------------------------------------------------*/
void
hg_log_set_func(int (*log_func)(FILE *stream, const char *format, ...))
{
hg_log_func_g = log_func;
}
/*---------------------------------------------------------------------------*/
void
hg_log_set_stream_debug(FILE *stream)
{
hg_log_stream_debug_g = stream;
}
/*---------------------------------------------------------------------------*/
void
hg_log_set_stream_warning(FILE *stream)
{
hg_log_stream_warning_g = stream;
}
/*---------------------------------------------------------------------------*/
void
hg_log_set_stream_error(FILE *stream)
{
hg_log_stream_error_g = stream;
}
/*---------------------------------------------------------------------------*/
void
hg_log_write(unsigned int log_type, const char *module, const char *file,
unsigned int line, const char *func, const char *format, ...)
{
char buf[HG_LOG_MAX_BUF];
FILE *stream = NULL;
const char *msg_type = NULL;
#ifdef HG_UTIL_HAS_LOG_COLOR
const char *color = "";
#endif
va_list ap;
switch (log_type) {
case HG_LOG_TYPE_DEBUG:
#ifdef HG_UTIL_HAS_LOG_COLOR
color = HG_LOG_BLUE;
#endif
stream = hg_log_stream_debug_g ? hg_log_stream_debug_g : stdout;
msg_type = "Debug";
break;
case HG_LOG_TYPE_WARNING:
#ifdef HG_UTIL_HAS_LOG_COLOR
color = HG_LOG_MAGENTA;
#endif
stream = hg_log_stream_warning_g ? hg_log_stream_warning_g : stdout;
msg_type = "Warning";
break;
case HG_LOG_TYPE_ERROR:
#ifdef HG_UTIL_HAS_LOG_COLOR
color = HG_LOG_RED;
#endif
stream = hg_log_stream_error_g ? hg_log_stream_error_g : stderr;
msg_type = "Error";
break;
default:
return;
};
va_start(ap, format);
vsnprintf(buf, HG_LOG_MAX_BUF, format, ap);
va_end(ap);
/* Print using logging function */
#ifdef HG_UTIL_HAS_LOG_COLOR
hg_log_func_g(stream,
"# %s%s[%s -- %s%s%s%s%s -- %s:%d]%s\n"
"## %s%s%s()%s: %s\n",
HG_LOG_REG, color, module, HG_LOG_BOLD, color, msg_type, HG_LOG_REG,
color, file, line, HG_LOG_RESET, HG_LOG_REG, HG_LOG_YELLOW, func,
HG_LOG_RESET, buf);
#else
hg_log_func_g(stream,
"# %s -- %s -- %s:%d\n"
" # %s(): %s\n",
module, msg_type, file, line, func, buf);
#endif
}

104
src/mercury/mercury_log.h Normal file
View File

@@ -0,0 +1,104 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#ifndef MERCURY_LOG_H
#define MERCURY_LOG_H
#include "mercury_util_config.h"
#include <stdio.h>
#define HG_LOG_TYPE_NONE 0
#define HG_LOG_TYPE_DEBUG 0x01
#define HG_LOG_TYPE_WARNING 0x02
#define HG_LOG_TYPE_ERROR 0x04
/* For compatibility */
#if defined(__STDC_VERSION__) && (__STDC_VERSION__ < 199901L)
# if defined(__GNUC__) && (__GNUC__ >= 2)
# define __func__ __FUNCTION__
# else
# define __func__ "<unknown>"
# endif
#elif defined(_WIN32)
# define __func__ __FUNCTION__
#endif
#define HG_LOG_WRITE_ERROR(HG_LOG_MODULE_NAME, ...) \
do { \
hg_log_write(HG_LOG_TYPE_ERROR, HG_LOG_MODULE_NAME, __FILE__, \
__LINE__, __func__, __VA_ARGS__); \
} while (0)
#define HG_LOG_WRITE_DEBUG(HG_LOG_MODULE_NAME, ...) \
do { \
hg_log_write(HG_LOG_TYPE_DEBUG, HG_LOG_MODULE_NAME, __FILE__, \
__LINE__, __func__, __VA_ARGS__); \
} while (0)
#define HG_LOG_WRITE_WARNING(HG_LOG_MODULE_NAME, ...) \
do { \
hg_log_write(HG_LOG_TYPE_WARNING, HG_LOG_MODULE_NAME, __FILE__, \
__LINE__, __func__, __VA_ARGS__); \
} while (0)
#ifdef __cplusplus
extern "C" {
#endif
/**
* Set the logging function.
*
* \param log_func [IN] pointer to function
*/
HG_UTIL_PUBLIC void
hg_log_set_func(int (*log_func)(FILE *stream, const char *format, ...));
/**
* Set the stream for debug output.
*
* \param stream [IN/OUT] pointer to stream
*/
HG_UTIL_PUBLIC void
hg_log_set_stream_debug(FILE *stream);
/**
* Set the stream for warning output.
*
* \param stream [IN/OUT] pointer to stream
*/
HG_UTIL_PUBLIC void
hg_log_set_stream_warning(FILE *stream);
/**
* Set the stream for error output.
*
* \param stream [IN/OUT] pointer to stream
*/
HG_UTIL_PUBLIC void
hg_log_set_stream_error(FILE *stream);
/**
* Write log.
*
* \param log_type [IN] log type (HG_LOG_TYPE_DEBUG, etc)
* \param module [IN] module name
* \param file [IN] file name
* \param line [IN] line number
* \param func [IN] function name
* \param format [IN] string format
*/
HG_UTIL_PUBLIC void
hg_log_write(unsigned int log_type, const char *module, const char *file,
unsigned int line, const char *func, const char *format, ...);
#ifdef __cplusplus
}
#endif
#endif /* MERCURY_LOG_H */

177
src/mercury/mercury_mem.c Normal file
View File

@@ -0,0 +1,177 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#include "mercury_mem.h"
#include "mercury_util_error.h"
#ifdef _WIN32
# include <windows.h>
#else
# include <errno.h>
# include <fcntl.h> /* For O_* constants */
# include <string.h>
# include <sys/mman.h>
# include <sys/stat.h> /* For mode constants */
# include <sys/types.h>
# include <unistd.h>
#endif
#include <stdlib.h>
/*---------------------------------------------------------------------------*/
long
hg_mem_get_page_size(void)
{
long page_size;
#ifdef _WIN32
SYSTEM_INFO system_info;
GetSystemInfo(&system_info);
page_size = system_info.dwPageSize;
#else
page_size = sysconf(_SC_PAGE_SIZE);
#endif
return page_size;
}
/*---------------------------------------------------------------------------*/
void *
hg_mem_aligned_alloc(size_t alignment, size_t size)
{
void *mem_ptr = NULL;
#ifdef _WIN32
mem_ptr = _aligned_malloc(size, alignment);
#else
# ifdef _ISOC11_SOURCE
mem_ptr = aligned_alloc(alignment, size);
# else
int rc = posix_memalign(&mem_ptr, alignment, size);
if (rc != 0)
return NULL;
# endif
#endif
return mem_ptr;
}
/*---------------------------------------------------------------------------*/
void
hg_mem_aligned_free(void *mem_ptr)
{
#ifdef _WIN32
_aligned_free(mem_ptr);
#else
free(mem_ptr);
#endif
}
/*---------------------------------------------------------------------------*/
void *
hg_mem_shm_map(const char *name, size_t size, hg_util_bool_t create)
{
void *mem_ptr = NULL;
#ifdef _WIN32
HANDLE fd = INVALID_HANDLE_VALUE;
LARGE_INTEGER large = {.QuadPart = size};
DWORD access = FILE_MAP_READ | FILE_MAP_WRITE;
BOOL rc;
if (create) {
fd = CreateFileMappingA(INVALID_HANDLE_VALUE, 0, PAGE_READWRITE,
large.HighPart, large.LowPart, name);
HG_UTIL_CHECK_ERROR_NORET(!fd, error, "CreateFileMappingA() failed");
} else {
fd = OpenFileMappingA(access, FALSE, name);
HG_UTIL_CHECK_ERROR_NORET(!fd, error, "OpenFileMappingA() failed");
}
mem_ptr = MapViewOfFile(fd, access, 0, 0, size);
HG_UTIL_CHECK_ERROR_NORET(!mem_ptr, error, "MapViewOfFile() failed");
/* The handle can be closed without affecting the memory mapping */
rc = CloseHandle(fd);
HG_UTIL_CHECK_ERROR_NORET(!rc, error, "CloseHandle() failed");
#else
int fd = 0;
int flags = O_RDWR | (create ? O_CREAT : 0);
struct stat shm_stat;
int rc;
fd = shm_open(name, flags, S_IRUSR | S_IWUSR);
HG_UTIL_CHECK_ERROR_NORET(
fd < 0, error, "shm_open() failed (%s)", strerror(errno));
rc = fstat(fd, &shm_stat);
HG_UTIL_CHECK_ERROR_NORET(
rc != 0, error, "fstat() failed (%s)", strerror(errno));
if (shm_stat.st_size == 0) {
rc = ftruncate(fd, (off_t) size);
HG_UTIL_CHECK_ERROR_NORET(
rc != 0, error, "ftruncate() failed (%s)", strerror(errno));
} else
HG_UTIL_CHECK_ERROR_NORET(
shm_stat.st_size < (off_t) size, error, "shm file size too small");
mem_ptr = mmap(NULL, size, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
HG_UTIL_CHECK_ERROR_NORET(
mem_ptr == MAP_FAILED, error, "mmap() failed (%s)", strerror(errno));
/* The file descriptor can be closed without affecting the memory mapping */
rc = close(fd);
HG_UTIL_CHECK_ERROR_NORET(
rc != 0, error, "close() failed (%s)", strerror(errno));
#endif
return mem_ptr;
error:
#ifdef _WIN32
if (fd)
CloseHandle(fd);
#else
if (fd > 0)
close(fd);
#endif
return NULL;
}
/*---------------------------------------------------------------------------*/
int
hg_mem_shm_unmap(const char *name, void *mem_ptr, size_t size)
{
int ret = HG_UTIL_SUCCESS;
#ifdef _WIN32
if (mem_ptr) {
BOOL rc = UnmapViewOfFile(mem_ptr);
HG_UTIL_CHECK_ERROR(
!rc, done, ret, HG_UTIL_FAIL, "UnmapViewOfFile() failed");
}
#else
if (mem_ptr && mem_ptr != MAP_FAILED) {
int rc = munmap(mem_ptr, size);
HG_UTIL_CHECK_ERROR(rc != 0, done, ret, HG_UTIL_FAIL,
"munmap() failed (%s)", strerror(errno));
}
if (name) {
int rc = shm_unlink(name);
HG_UTIL_CHECK_ERROR(rc != 0, done, ret, HG_UTIL_FAIL,
"shm_unlink() failed (%s)", strerror(errno));
}
#endif
done:
return ret;
}

92
src/mercury/mercury_mem.h Normal file
View File

@@ -0,0 +1,92 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#ifndef MERCURY_MEM_H
#define MERCURY_MEM_H
#include "mercury_util_config.h"
/*************************************/
/* Public Type and Struct Definition */
/*************************************/
/*****************/
/* Public Macros */
/*****************/
#define HG_MEM_CACHE_LINE_SIZE 64
#define HG_MEM_PAGE_SIZE 4096
/*********************/
/* Public Prototypes */
/*********************/
#ifdef __cplusplus
extern "C" {
#endif
/**
* Get system default page size.
*
* \return page size on success or negative on failure
*/
HG_UTIL_PUBLIC long
hg_mem_get_page_size(void);
/**
* Allocate size bytes and return a pointer to the allocated memory.
* The memory address will be a multiple of alignment, which must be a power of
* two, and size should be a multiple of alignment.
*
* \param alignment [IN] alignment size
* \param size [IN] total requested size
*
* \return a pointer to the allocated memory, or NULL in case of failure
*/
HG_UTIL_PUBLIC void *
hg_mem_aligned_alloc(size_t alignment, size_t size);
/**
* Free memory allocated from hg_aligned_alloc().
*
* \param mem_ptr [IN] pointer to allocated memory
*/
HG_UTIL_PUBLIC void
hg_mem_aligned_free(void *mem_ptr);
/**
* Create/open a shared-memory mapped file of size \size with name \name.
*
* \param name [IN] name of mapped file
* \param size [IN] total requested size
* \param create [IN] create file if not existing
*
* \return a pointer to the mapped memory region, or NULL in case of failure
*/
HG_UTIL_PUBLIC void *
hg_mem_shm_map(const char *name, size_t size, hg_util_bool_t create);
/**
* Unmap a previously mapped region and close the file.
*
* \param name [IN] name of mapped file
* \param mem_ptr [IN] pointer to mapped memory region
* \param size [IN] size range of the mapped region
*
* \return non-negative on success, or negative in case of failure
*/
HG_UTIL_PUBLIC int
hg_mem_shm_unmap(const char *name, void *mem_ptr, size_t size);
#ifdef __cplusplus
}
#endif
#endif /* MERCURY_MEM_H */

531
src/mercury/mercury_poll.c Normal file
View File

@@ -0,0 +1,531 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#include "mercury_poll.h"
#include "mercury_atomic.h"
#include "mercury_event.h"
#include "mercury_list.h"
#include "mercury_thread_spin.h"
#include "mercury_util_error.h"
#include <stdlib.h>
#if defined(_WIN32)
/* TODO */
#else
# include <errno.h>
# include <string.h>
# include <unistd.h>
# if defined(HG_UTIL_HAS_SYSEPOLL_H)
# include <sys/epoll.h>
# elif defined(HG_UTIL_HAS_SYSEVENT_H)
# include <sys/event.h>
# include <sys/time.h>
# else
# include <poll.h>
# endif
#endif /* defined(_WIN32) */
/****************/
/* Local Macros */
/****************/
#define HG_POLL_MAX_EVENTS 1024
#ifndef MIN
# define MIN(a, b) (((a) < (b)) ? (a) : (b))
#endif
/************************************/
/* Local Type and Struct Definition */
/************************************/
struct hg_poll_data {
#if defined(HG_UTIL_HAS_SYSEPOLL_H)
int fd;
#elif defined(HG_UTIL_HAS_SYSEVENT_H)
struct kevent kev;
#else
struct pollfd pollfd;
#endif
hg_poll_cb_t poll_cb;
void *poll_arg;
HG_LIST_ENTRY(hg_poll_data) entry;
};
struct hg_poll_set {
int fd;
hg_atomic_int32_t nfds;
hg_poll_try_wait_cb_t try_wait_cb;
void *try_wait_arg;
HG_LIST_HEAD(hg_poll_data) poll_data_list;
hg_thread_spin_t poll_data_list_lock;
};
/********************/
/* Local Prototypes */
/********************/
/*******************/
/* Local Variables */
/*******************/
/*---------------------------------------------------------------------------*/
hg_poll_set_t *
hg_poll_create(void)
{
struct hg_poll_set *hg_poll_set = NULL;
hg_poll_set = malloc(sizeof(struct hg_poll_set));
HG_UTIL_CHECK_ERROR_NORET(
hg_poll_set == NULL, error, "malloc() failed (%s)");
#if defined(_WIN32)
/* TODO */
#else
HG_LIST_INIT(&hg_poll_set->poll_data_list);
hg_thread_spin_init(&hg_poll_set->poll_data_list_lock);
hg_atomic_init32(&hg_poll_set->nfds, 0);
hg_poll_set->try_wait_cb = NULL;
# if defined(HG_UTIL_HAS_SYSEPOLL_H)
hg_poll_set->fd = epoll_create1(0);
HG_UTIL_CHECK_ERROR_NORET(hg_poll_set->fd == -1, error,
"epoll_create1() failed (%s)", strerror(errno));
# elif defined(HG_UTIL_HAS_SYSEVENT_H)
hg_poll_set->fd = kqueue();
HG_UTIL_CHECK_ERROR_NORET(
hg_poll_set->fd == -1, error, "kqueue() failed (%s)", strerror(errno));
# else
hg_poll_set->fd = hg_event_create();
HG_UTIL_CHECK_ERROR_NORET(hg_poll_set->fd == -1, error,
"hg_event_create() failed (%s)", strerror(errno));
# endif
#endif /* defined(_WIN32) */
return hg_poll_set;
error:
if (hg_poll_set) {
hg_thread_spin_destroy(&hg_poll_set->poll_data_list_lock);
free(hg_poll_set);
}
return NULL;
}
/*---------------------------------------------------------------------------*/
int
hg_poll_destroy(hg_poll_set_t *poll_set)
{
int ret = HG_UTIL_SUCCESS;
int rc;
if (!poll_set)
goto done;
#if defined(_WIN32)
/* TODO */
#else
HG_UTIL_CHECK_ERROR(hg_atomic_get32(&poll_set->nfds), done, ret,
HG_UTIL_FAIL, "Poll set non empty");
# if defined(HG_UTIL_HAS_SYSEPOLL_H) || defined(HG_UTIL_HAS_SYSEVENT_H)
/* Close poll descriptor */
rc = close(poll_set->fd);
HG_UTIL_CHECK_ERROR(rc == -1, done, ret, HG_UTIL_FAIL,
"close() failed (%s)", strerror(errno));
# else
rc = hg_event_destroy(poll_set->fd);
HG_UTIL_CHECK_ERROR(rc == HG_UTIL_FAIL, done, ret, HG_UTIL_FAIL,
"hg_event_destroy() failed (%s)", strerror(errno));
# endif
hg_thread_spin_destroy(&poll_set->poll_data_list_lock);
#endif /* defined(_WIN32) */
free(poll_set);
done:
return ret;
}
/*---------------------------------------------------------------------------*/
int
hg_poll_get_fd(hg_poll_set_t *poll_set)
{
int fd = -1;
HG_UTIL_CHECK_ERROR_NORET(!poll_set, done, "NULL poll set");
#if defined(_WIN32)
/* TODO */
#else
fd = poll_set->fd;
#endif
done:
return fd;
}
/*---------------------------------------------------------------------------*/
int
hg_poll_set_try_wait(
hg_poll_set_t *poll_set, hg_poll_try_wait_cb_t try_wait_cb, void *arg)
{
int ret = HG_UTIL_SUCCESS;
HG_UTIL_CHECK_ERROR(!poll_set, done, ret, HG_UTIL_FAIL, "NULL poll set");
poll_set->try_wait_cb = try_wait_cb;
poll_set->try_wait_arg = arg;
done:
return ret;
}
/*---------------------------------------------------------------------------*/
int
hg_poll_add(hg_poll_set_t *poll_set, int fd, unsigned int flags,
hg_poll_cb_t poll_cb, void *poll_arg)
{
struct hg_poll_data *hg_poll_data = NULL;
int ret = HG_UTIL_SUCCESS;
HG_UTIL_CHECK_ERROR(!poll_set, done, ret, HG_UTIL_FAIL, "NULL poll set");
/* Allocate poll data that can hold user data and callback */
hg_poll_data = malloc(sizeof(struct hg_poll_data));
HG_UTIL_CHECK_ERROR(
!hg_poll_data, done, ret, HG_UTIL_FAIL, "malloc() failed (%s)");
memset(hg_poll_data, 0, sizeof(struct hg_poll_data));
hg_poll_data->poll_cb = poll_cb;
hg_poll_data->poll_arg = poll_arg;
if (fd > 0) {
#if defined(_WIN32)
/* TODO */
#elif defined(HG_UTIL_HAS_SYSEPOLL_H)
struct epoll_event ev;
uint32_t poll_flags;
int rc;
/* Translate flags */
switch (flags) {
case HG_POLLIN:
poll_flags = EPOLLIN;
break;
case HG_POLLOUT:
poll_flags = EPOLLOUT;
break;
default:
HG_UTIL_GOTO_ERROR(error, ret, HG_UTIL_FAIL, "Invalid flag");
}
hg_poll_data->fd = fd;
ev.events = poll_flags;
ev.data.ptr = hg_poll_data;
rc = epoll_ctl(poll_set->fd, EPOLL_CTL_ADD, fd, &ev);
HG_UTIL_CHECK_ERROR(rc != 0, error, ret, HG_UTIL_FAIL,
"epoll_ctl() failed (%s)", strerror(errno));
#elif defined(HG_UTIL_HAS_SYSEVENT_H)
struct timespec timeout = {0, 0};
int16_t poll_flags;
int rc;
/* Translate flags */
switch (flags) {
case HG_POLLIN:
poll_flags = EVFILT_READ;
break;
case HG_POLLOUT:
poll_flags = EVFILT_WRITE;
break;
default:
HG_UTIL_GOTO_ERROR(error, ret, HG_UTIL_FAIL, "Invalid flag");
}
EV_SET(&hg_poll_data->kev, (uintptr_t) fd, poll_flags, EV_ADD, 0, 0,
hg_poll_data);
rc = kevent(poll_set->fd, &hg_poll_data->kev, 1, NULL, 0, &timeout);
HG_UTIL_CHECK_ERROR(rc == -1, error, ret, HG_UTIL_FAIL,
"kevent() failed (%s)", strerror(errno));
#else
short int poll_flags;
/* Translate flags */
switch (flags) {
case HG_POLLIN:
poll_flags = POLLIN;
break;
case HG_POLLOUT:
poll_flags = POLLOUT;
break;
default:
HG_UTIL_GOTO_ERROR(error, ret, HG_UTIL_FAIL, "Invalid flag");
}
hg_poll_data->pollfd.fd = fd;
hg_poll_data->pollfd.events = poll_flags;
hg_poll_data->pollfd.revents = 0;
#endif /* defined(_WIN32) */
}
hg_atomic_incr32(&poll_set->nfds);
hg_thread_spin_lock(&poll_set->poll_data_list_lock);
HG_LIST_INSERT_HEAD(&poll_set->poll_data_list, hg_poll_data, entry);
hg_thread_spin_unlock(&poll_set->poll_data_list_lock);
done:
return ret;
error:
free(hg_poll_data);
return HG_UTIL_FAIL;
}
/*---------------------------------------------------------------------------*/
int
hg_poll_remove(hg_poll_set_t *poll_set, int fd)
{
struct hg_poll_data *hg_poll_data;
hg_util_bool_t found = HG_UTIL_FALSE;
int ret = HG_UTIL_SUCCESS;
HG_UTIL_CHECK_ERROR(!poll_set, done, ret, HG_UTIL_FAIL, "NULL poll set");
hg_thread_spin_lock(&poll_set->poll_data_list_lock);
HG_LIST_FOREACH (hg_poll_data, &poll_set->poll_data_list, entry) {
#if defined(_WIN32)
/* TODO */
#elif defined(HG_UTIL_HAS_SYSEPOLL_H)
if (hg_poll_data->fd == fd) {
HG_LIST_REMOVE(hg_poll_data, entry);
if (fd > 0) {
int rc = epoll_ctl(poll_set->fd, EPOLL_CTL_DEL, fd, NULL);
HG_UTIL_CHECK_ERROR(rc != 0, error, ret, HG_UTIL_FAIL,
"epoll_ctl() failed (%s)", strerror(errno));
}
free(hg_poll_data);
found = HG_UTIL_TRUE;
break;
}
#elif defined(HG_UTIL_HAS_SYSEVENT_H)
/* Events which are attached to file descriptors are automatically
* deleted on the last close of the descriptor. */
if ((int) hg_poll_data->kev.ident == fd) {
HG_LIST_REMOVE(hg_poll_data, entry);
if (fd > 0) {
struct timespec timeout = {0, 0};
int rc;
EV_SET(&hg_poll_data->kev, (uintptr_t) fd, EVFILT_READ,
EV_DELETE, 0, 0, NULL);
rc = kevent(
poll_set->fd, &hg_poll_data->kev, 1, NULL, 0, &timeout);
HG_UTIL_CHECK_ERROR(rc == -1, error, ret, HG_UTIL_FAIL,
"kevent() failed (%s)", strerror(errno));
}
free(hg_poll_data);
found = HG_UTIL_TRUE;
break;
}
#else
if (hg_poll_data->pollfd.fd == fd) {
HG_LIST_REMOVE(hg_poll_data, entry);
free(hg_poll_data);
found = HG_UTIL_TRUE;
break;
}
#endif
}
hg_thread_spin_unlock(&poll_set->poll_data_list_lock);
HG_UTIL_CHECK_ERROR(
!found, done, ret, HG_UTIL_FAIL, "Could not find fd in poll_set");
hg_atomic_decr32(&poll_set->nfds);
done:
return ret;
#if defined(HG_UTIL_HAS_SYSEPOLL_H) || defined(HG_UTIL_HAS_SYSEVENT_H)
error:
hg_thread_spin_unlock(&poll_set->poll_data_list_lock);
return ret;
#endif
}
/*---------------------------------------------------------------------------*/
int
hg_poll_wait(hg_poll_set_t *poll_set, unsigned int timeout,
unsigned int max_events, struct hg_poll_event *events,
unsigned int *actual_events)
{
int max_poll_events = (int) MIN(max_events, HG_POLL_MAX_EVENTS);
int nfds = 0, i;
int ret = HG_UTIL_SUCCESS;
HG_UTIL_CHECK_ERROR(!poll_set, done, ret, HG_UTIL_FAIL, "NULL poll set");
if (timeout && (!poll_set->try_wait_cb ||
(poll_set->try_wait_cb &&
poll_set->try_wait_cb(poll_set->try_wait_arg)))) {
#if defined(_WIN32)
#elif defined(HG_UTIL_HAS_SYSEPOLL_H)
struct epoll_event poll_events[HG_POLL_MAX_EVENTS];
nfds = epoll_wait(
poll_set->fd, poll_events, max_poll_events, (int) timeout);
HG_UTIL_CHECK_ERROR(nfds == -1 && errno != EINTR, done, ret,
HG_UTIL_FAIL, "epoll_wait() failed (%s)", strerror(errno));
for (i = 0; i < nfds; ++i) {
struct hg_poll_data *hg_poll_data =
(struct hg_poll_data *) poll_events[i].data.ptr;
int error = 0, rc;
HG_UTIL_CHECK_ERROR(hg_poll_data == NULL, done, ret, HG_UTIL_FAIL,
"NULL poll data");
/* Don't change the if/else order */
if (poll_events[i].events & EPOLLERR)
error = EPOLLERR;
else if (poll_events[i].events & EPOLLHUP)
error = EPOLLHUP;
else if (poll_events[i].events & EPOLLRDHUP)
error = EPOLLRDHUP;
HG_UTIL_CHECK_ERROR(!(poll_events[i].events & (EPOLLIN | EPOLLOUT)),
done, ret, HG_UTIL_FAIL, "Unsupported events");
if (!hg_poll_data->poll_cb)
continue;
rc = hg_poll_data->poll_cb(
hg_poll_data->poll_arg, error, &events[i]);
HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL,
"poll cb failed");
}
#elif defined(HG_UTIL_HAS_SYSEVENT_H)
struct kevent poll_events[HG_POLL_MAX_EVENTS];
struct timespec timeout_spec;
ldiv_t ld;
/* Get sec / nsec */
ld = ldiv(timeout, 1000L);
timeout_spec.tv_sec = ld.quot;
timeout_spec.tv_nsec = ld.rem * 1000000L;
nfds = kevent(
poll_set->fd, NULL, 0, poll_events, max_events, &timeout_spec);
HG_UTIL_CHECK_ERROR(nfds == -1 && errno != EINTR, done, ret,
HG_UTIL_FAIL, "kevent() failed (%s)", strerror(errno));
for (i = 0; i < nfds; ++i) {
struct hg_poll_data *hg_poll_data =
(struct hg_poll_data *) poll_events[i].udata;
int rc;
HG_UTIL_CHECK_ERROR(hg_poll_data == NULL, done, ret, HG_UTIL_FAIL,
"NULL poll data");
if (!hg_poll_data->poll_cb)
continue;
rc = hg_poll_data->poll_cb(hg_poll_data->poll_arg, 0, &events[i]);
HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL,
"poll cb failed");
}
#else
struct pollfd poll_events[HG_POLL_MAX_EVENTS] = {0};
struct hg_poll_data *poll_data_events[HG_POLL_MAX_EVENTS] = {NULL};
struct hg_poll_data *hg_poll_data = NULL;
int nevents = 0;
/* Reset revents */
hg_thread_spin_lock(&poll_set->poll_data_list_lock);
for (hg_poll_data = HG_LIST_FIRST(&poll_set->poll_data_list);
hg_poll_data && (nevents < max_poll_events);
hg_poll_data = HG_LIST_NEXT(hg_poll_data, entry), nevents++) {
poll_events[nevents] = hg_poll_data->pollfd;
poll_data_events[nevents] = hg_poll_data;
}
hg_thread_spin_unlock(&poll_set->poll_data_list_lock);
nfds = poll(poll_events, nevents, (int) timeout);
HG_UTIL_CHECK_ERROR(nfds == -1 && errno != EINTR, done, ret,
HG_UTIL_FAIL, "poll() failed (%s)", strerror(errno));
/* An event on one of the fds has occurred. */
for (i = 0; i < nfds; ++i) {
int rc;
if (!(poll_events[i].revents & poll_events[i].events))
continue;
/* TODO check POLLHUP | POLLERR | POLLNVAL */
if (!poll_data_events[i]->poll_cb)
continue;
rc = poll_data_events[i]->poll_cb(
poll_data_events[i]->poll_arg, 0, &events[i]);
HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL,
"poll cb failed");
}
if (nfds) {
/* TODO should figure where to call hg_event_get() */
int rc = hg_event_set(poll_set->fd);
HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL,
"hg_event_set() failed (%s)", strerror(errno));
}
#endif
} else {
#ifdef _WIN32
#else
struct hg_poll_data *poll_data_events[HG_POLL_MAX_EVENTS] = {NULL};
struct hg_poll_data *hg_poll_data;
int nevents = 0;
/* Reset revents */
hg_thread_spin_lock(&poll_set->poll_data_list_lock);
for (hg_poll_data = HG_LIST_FIRST(&poll_set->poll_data_list);
hg_poll_data && (nevents < max_poll_events);
hg_poll_data = HG_LIST_NEXT(hg_poll_data, entry), nevents++)
poll_data_events[nevents] = hg_poll_data;
hg_thread_spin_unlock(&poll_set->poll_data_list_lock);
nfds = nevents;
for (i = 0; i < nfds; ++i) {
int rc;
if (!poll_data_events[i]->poll_cb)
continue;
rc = poll_data_events[i]->poll_cb(
poll_data_events[i]->poll_arg, 0, &events[i]);
HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL,
"poll cb failed");
}
#endif
}
if (actual_events)
*actual_events = (unsigned int) nfds;
done:
return ret;
}

164
src/mercury/mercury_poll.h Normal file
View File

@@ -0,0 +1,164 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#ifndef MERCURY_POLL_H
#define MERCURY_POLL_H
#include "mercury_util_config.h"
/**
* Purpose: define an interface that either polls or allows busy wait
* without entering system calls.
*/
/*************************************/
/* Public Type and Struct Definition */
/*************************************/
typedef struct hg_poll_set hg_poll_set_t;
struct hg_poll_event {
hg_util_bool_t progressed; /* Indicates progress */
void *ptr; /* Pointer to user data */
};
/**
* Callback that can be used to signal when it is safe to block on the
* poll set or if blocking could hang the application.
*
* \param arg [IN] function argument
*
* \return HG_UTIL_TRUE if it is safe to block or HG_UTIL_FALSE otherwise
*/
typedef hg_util_bool_t (*hg_poll_try_wait_cb_t)(void *arg);
/**
* Polling callback, arg can be used to pass user arguments, event can be used
* to return user arguments back to hg_poll_wait.
*
* \param arg [IN] pointer to user data
* \param error [IN] any error event has occurred
* \param ptr [OUT] event data output
*
* \return Non-negative on success or negative on failure
*/
typedef int (*hg_poll_cb_t)(void *arg, int error, struct hg_poll_event *event);
/*****************/
/* Public Macros */
/*****************/
/**
* Polling events.
*/
#define HG_POLLIN 0x001 /* Ready to read. */
#define HG_POLLOUT 0x004 /* Ready to write. */
/*********************/
/* Public Prototypes */
/*********************/
#ifdef __cplusplus
extern "C" {
#endif
/**
* Create a new poll set.
*
* \return Pointer to poll set or NULL in case of failure
*/
HG_UTIL_PUBLIC hg_poll_set_t *
hg_poll_create(void);
/**
* Destroy a poll set.
*
* \param poll_set [IN] pointer to poll set
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_poll_destroy(hg_poll_set_t *poll_set);
/**
* Get a file descriptor from an existing poll set.
*
* \param poll_set [IN] pointer to poll set
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_poll_get_fd(hg_poll_set_t *poll_set);
/**
* Set a callback that can be used to signal when it is safe to block on the
* poll set or if blocking could hang the application, in which case behavior
* is the same as passing a timeout of 0.
*
* \param poll_set [IN] pointer to poll set
* \param try_wait_cb [IN] function pointer
* \param try_wait_arg [IN] function pointer argument
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_poll_set_try_wait(hg_poll_set_t *poll_set, hg_poll_try_wait_cb_t try_wait_cb,
void *try_wait_arg);
/**
* Add file descriptor to poll set.
*
* \param poll_set [IN] pointer to poll set
* \param fd [IN] file descriptor
* \param flags [IN] polling flags (HG_POLLIN, etc)
* \param poll_cb [IN] function pointer
* \param poll_cb_args [IN] function pointer argument
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_poll_add(hg_poll_set_t *poll_set, int fd, unsigned int flags,
hg_poll_cb_t poll_cb, void *poll_cb_arg);
/**
* Remove file descriptor from poll set.
*
* \param poll_set [IN] pointer to poll set
* \param fd [IN] file descriptor
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_poll_remove(hg_poll_set_t *poll_set, int fd);
/**
* Wait on a poll set for timeout ms, progressed indicating whether progress has
* been made after that call returns. If timeout is 0, progress is performed
* on all the registered polling callbacks and hg_poll_wait() exits as soon as
* progress is made. If timeout is non 0, the system dependent polling function
* call is entered and progress is performed on the list of file descriptors
* for which an event has occurred.
*
* \param poll_set [IN] pointer to poll set
* \param timeout [IN] timeout (in milliseconds)
* \param progressed [OUT] pointer to boolean indicating progress made
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_poll_wait(hg_poll_set_t *poll_set, unsigned int timeout,
unsigned int max_events, struct hg_poll_event events[],
unsigned int *actual_events);
#ifdef __cplusplus
}
#endif
#endif /* MERCURY_POLL_H */

123
src/mercury/mercury_queue.h Normal file
View File

@@ -0,0 +1,123 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
/* Code below is derived from sys/queue.h which follows the below notice:
*
* Copyright (c) 1991, 1993
* The Regents of the University of California. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of the University nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* @(#)queue.h 8.5 (Berkeley) 8/20/94
*/
#ifndef MERCURY_QUEUE_H
#define MERCURY_QUEUE_H
#define HG_QUEUE_HEAD_INITIALIZER(name) \
{ \
NULL, &(name).head \
}
#define HG_QUEUE_HEAD_INIT(struct_head_name, var_name) \
struct struct_head_name var_name = HG_QUEUE_HEAD_INITIALIZER(var_name)
#define HG_QUEUE_HEAD_DECL(struct_head_name, struct_entry_name) \
struct struct_head_name { \
struct struct_entry_name *head; \
struct struct_entry_name **tail; \
}
#define HG_QUEUE_HEAD(struct_entry_name) \
struct { \
struct struct_entry_name *head; \
struct struct_entry_name **tail; \
}
#define HG_QUEUE_ENTRY(struct_entry_name) \
struct { \
struct struct_entry_name *next; \
}
#define HG_QUEUE_INIT(head_ptr) \
do { \
(head_ptr)->head = NULL; \
(head_ptr)->tail = &(head_ptr)->head; \
} while (/*CONSTCOND*/ 0)
#define HG_QUEUE_IS_EMPTY(head_ptr) ((head_ptr)->head == NULL)
#define HG_QUEUE_FIRST(head_ptr) ((head_ptr)->head)
#define HG_QUEUE_NEXT(entry_ptr, entry_field_name) \
((entry_ptr)->entry_field_name.next)
#define HG_QUEUE_PUSH_TAIL(head_ptr, entry_ptr, entry_field_name) \
do { \
(entry_ptr)->entry_field_name.next = NULL; \
*(head_ptr)->tail = (entry_ptr); \
(head_ptr)->tail = &(entry_ptr)->entry_field_name.next; \
} while (/*CONSTCOND*/ 0)
/* TODO would be nice to not have any condition */
#define HG_QUEUE_POP_HEAD(head_ptr, entry_field_name) \
do { \
if ((head_ptr)->head && \
((head_ptr)->head = (head_ptr)->head->entry_field_name.next) == \
NULL) \
(head_ptr)->tail = &(head_ptr)->head; \
} while (/*CONSTCOND*/ 0)
#define HG_QUEUE_FOREACH(var, head_ptr, entry_field_name) \
for ((var) = ((head_ptr)->head); (var); \
(var) = ((var)->entry_field_name.next))
/**
* Avoid using those for performance reasons or use mercury_list.h instead
*/
#define HG_QUEUE_REMOVE(head_ptr, entry_ptr, type, entry_field_name) \
do { \
if ((head_ptr)->head == (entry_ptr)) { \
HG_QUEUE_POP_HEAD((head_ptr), entry_field_name); \
} else { \
struct type *curelm = (head_ptr)->head; \
while (curelm->entry_field_name.next != (entry_ptr)) \
curelm = curelm->entry_field_name.next; \
if ((curelm->entry_field_name.next = \
curelm->entry_field_name.next->entry_field_name \
.next) == NULL) \
(head_ptr)->tail = &(curelm)->entry_field_name.next; \
} \
} while (/*CONSTCOND*/ 0)
#endif /* MERCURY_QUEUE_H */

View File

@@ -0,0 +1,224 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#include "mercury_request.h"
#include "mercury_thread_condition.h"
#include "mercury_thread_mutex.h"
#include "mercury_time.h"
#include "mercury_util_error.h"
#include <stdlib.h>
/****************/
/* Local Macros */
/****************/
/************************************/
/* Local Type and Struct Definition */
/************************************/
struct hg_request_class {
hg_request_progress_func_t progress_func;
hg_request_trigger_func_t trigger_func;
void *arg;
hg_util_bool_t progressing;
hg_thread_mutex_t progress_mutex;
hg_thread_cond_t progress_cond;
};
/********************/
/* Local Prototypes */
/********************/
/*******************/
/* Local Variables */
/*******************/
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_util_bool_t
hg_request_check(hg_request_t *request)
{
int trigger_ret;
unsigned int trigger_flag = 0;
hg_util_bool_t ret = HG_UTIL_FALSE;
do {
trigger_ret = request->request_class->trigger_func(
0, &trigger_flag, request->request_class->arg);
} while ((trigger_ret == HG_UTIL_SUCCESS) && trigger_flag);
if (hg_atomic_cas32(&request->completed, HG_UTIL_TRUE, HG_UTIL_FALSE))
ret = HG_UTIL_TRUE;
return ret;
}
/*---------------------------------------------------------------------------*/
hg_request_class_t *
hg_request_init(hg_request_progress_func_t progress_func,
hg_request_trigger_func_t trigger_func, void *arg)
{
struct hg_request_class *hg_request_class = NULL;
hg_request_class =
(struct hg_request_class *) malloc(sizeof(struct hg_request_class));
HG_UTIL_CHECK_ERROR_NORET(
hg_request_class == NULL, done, "Could not allocate hg_request_class");
hg_request_class->progress_func = progress_func;
hg_request_class->trigger_func = trigger_func;
hg_request_class->arg = arg;
hg_request_class->progressing = HG_UTIL_FALSE;
hg_thread_mutex_init(&hg_request_class->progress_mutex);
hg_thread_cond_init(&hg_request_class->progress_cond);
done:
return hg_request_class;
}
/*---------------------------------------------------------------------------*/
int
hg_request_finalize(hg_request_class_t *request_class, void **arg)
{
if (!request_class)
goto done;
if (arg)
*arg = request_class->arg;
hg_thread_mutex_destroy(&request_class->progress_mutex);
hg_thread_cond_destroy(&request_class->progress_cond);
free(request_class);
done:
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
hg_request_t *
hg_request_create(hg_request_class_t *request_class)
{
struct hg_request *hg_request = NULL;
hg_request = (struct hg_request *) malloc(sizeof(struct hg_request));
HG_UTIL_CHECK_ERROR_NORET(
hg_request == NULL, done, "Could not allocate hg_request");
hg_request->data = NULL;
hg_atomic_set32(&hg_request->completed, HG_UTIL_FALSE);
hg_request->request_class = request_class;
done:
return hg_request;
}
/*---------------------------------------------------------------------------*/
int
hg_request_destroy(hg_request_t *request)
{
int ret = HG_UTIL_SUCCESS;
free(request);
return ret;
}
/*---------------------------------------------------------------------------*/
/*
* lock(progress_mutex)
* while (!completed) {
* check_request
* if (completed) {
* unlock(progress_mutex);
* return;
* }
* if (in_progress) {
* wait_cond(progress_cond);
* continue;
* }
* in_progress = true;
* unlock(progress_mutex);
* trigger;
* progress;
* lock(progress);
* in_progress = false;
* signal(progress_cond);
* }
* unlock(progress_mutex);
*/
/*---------------------------------------------------------------------------*/
int
hg_request_wait(hg_request_t *request, unsigned int timeout, unsigned int *flag)
{
double remaining =
timeout / 1000.0; /* Convert timeout in ms into seconds */
hg_util_bool_t completed = HG_UTIL_FALSE;
int ret = HG_UTIL_SUCCESS;
hg_thread_mutex_lock(&request->request_class->progress_mutex);
do {
hg_time_t t3, t4;
completed = hg_request_check(request);
if (completed)
break;
if (request->request_class->progressing) {
hg_time_t t1, t2;
if (remaining <= 0) {
/* Timeout occurred so leave */
break;
}
hg_time_get_current(&t1);
if (hg_thread_cond_timedwait(&request->request_class->progress_cond,
&request->request_class->progress_mutex,
(unsigned int) (remaining * 1000.0)) != HG_UTIL_SUCCESS) {
/* Timeout occurred so leave */
break;
}
hg_time_get_current(&t2);
remaining -= hg_time_to_double(hg_time_subtract(t2, t1));
if (remaining < 0)
break;
/* Continue as request may have completed in the meantime */
continue;
}
request->request_class->progressing = HG_UTIL_TRUE;
hg_thread_mutex_unlock(&request->request_class->progress_mutex);
if (timeout)
hg_time_get_current(&t3);
request->request_class->progress_func(
(unsigned int) (remaining * 1000.0), request->request_class->arg);
if (timeout) {
hg_time_get_current(&t4);
remaining -= hg_time_to_double(hg_time_subtract(t4, t3));
}
hg_thread_mutex_lock(&request->request_class->progress_mutex);
request->request_class->progressing = HG_UTIL_FALSE;
hg_thread_cond_broadcast(&request->request_class->progress_cond);
} while (!completed && (remaining > 0));
hg_thread_mutex_unlock(&request->request_class->progress_mutex);
if (flag)
*flag = completed;
return ret;
}

View File

@@ -0,0 +1,242 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#ifndef MERCURY_REQUEST_H
#define MERCURY_REQUEST_H
#include "mercury_util_config.h"
#include "mercury_atomic.h"
/**
* Purpose: define a request emulation library on top of the callback model
* that uses progress/trigger functions. Note that this library can not be
* safely used within RPCs in most cases - calling hg_request_wait causes
* deadlock when the caller function was triggered by HG_Trigger
* (or HG_Bulk_trigger).
*/
typedef struct hg_request_class hg_request_class_t; /* Opaque request class */
typedef struct hg_request hg_request_t; /* Opaque request object */
struct hg_request {
void *data;
hg_atomic_int32_t completed;
hg_request_class_t *request_class;
};
/**
* Progress callback, arg can be used to pass extra parameters required by
* underlying API.
*
* \param timeout [IN] timeout (in milliseconds)
* \param arg [IN] pointer to data passed to callback
*
* \return HG_UTIL_SUCCESS if any completion has occurred / error code otherwise
*/
typedef int (*hg_request_progress_func_t)(unsigned int timeout, void *arg);
/**
* Trigger callback, arg can be used to pass extra parameters required by
* underlying API.
*
* \param timeout [IN] timeout (in milliseconds)
* \param flag [OUT] 1 if callback has been triggered, 0 otherwise
* \param arg [IN] pointer to data passed to callback
*
* \return HG_UTIL_SUCCESS or corresponding error code
*/
typedef int (*hg_request_trigger_func_t)(
unsigned int timeout, unsigned int *flag, void *arg);
#ifdef __cplusplus
extern "C" {
#endif
/**
* Initialize the request class with the specific progress/trigger functions
* that will be called on hg_request_wait().
* arg can be used to pass extra parameters required by underlying API.
*
* \param progress [IN] progress function
* \param trigger [IN] trigger function
* \param arg [IN] pointer to data passed to callback
*
* \return Pointer to request class or NULL in case of failure
*/
HG_UTIL_PUBLIC hg_request_class_t *
hg_request_init(hg_request_progress_func_t progress,
hg_request_trigger_func_t trigger, void *arg);
/**
* Finalize the request class. User args that were passed through
* hg_request_init() can be retrieved through the \a arg parameter.
*
* \param request_class [IN] pointer to request class
* \param arg [IN/OUT] pointer to init args
*/
HG_UTIL_PUBLIC int
hg_request_finalize(hg_request_class_t *request_class, void **arg);
/**
* Create a new request from a specified request class. The progress function
* explicitly makes progress and may insert the completed operation into a
* completion queue. The operation gets triggered after a call to the trigger
* function.
*
* \param request_class [IN] pointer to request class
*
* \return Pointer to request or NULL in case of failure
*/
HG_UTIL_PUBLIC hg_request_t *
hg_request_create(hg_request_class_t *request_class);
/**
* Destroy the request, freeing the resources.
*
* \param request [IN/OUT] pointer to request
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_request_destroy(hg_request_t *request);
/**
* Reset an existing request so that it can be safely re-used.
*
* \param request [IN/OUT] pointer to request
*
* \return Pointer to request or NULL in case of failure
*/
static HG_UTIL_INLINE int
hg_request_reset(hg_request_t *request);
/**
* Mark the request as completed. (most likely called by a callback triggered
* after a call to trigger)
*
* \param request [IN/OUT] pointer to request
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_request_complete(hg_request_t *request);
/**
* Wait timeout ms for the specified request to complete.
*
* \param request [IN/OUT] pointer to request
* \param timeout [IN] timeout (in milliseconds)
* \param flag [OUT] 1 if request has completed, 0 otherwise
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_request_wait(
hg_request_t *request, unsigned int timeout, unsigned int *flag);
/**
* Wait timeout ms for all the specified request to complete.
*
* \param count [IN] number of requests
* \param request [IN/OUT] arrays of requests
* \param timeout [IN] timeout (in milliseconds)
* \param flag [OUT] 1 if all requests have completed, 0 otherwise
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_request_waitall(int count, hg_request_t *request[], unsigned int timeout,
unsigned int *flag);
/**
* Attach user data to a specified request.
*
* \param request [IN/OUT] pointer to request
* \param data [IN] pointer to data
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_request_set_data(hg_request_t *request, void *data);
/**
* Get user data from a specified request.
*
* \param request [IN/OUT] pointer to request
*
* \return Pointer to data or NULL if nothing was attached by user
*/
static HG_UTIL_INLINE void *
hg_request_get_data(hg_request_t *request);
/**
* Cancel the request.
*
* \param request [IN] request object
*
* \return Non-negative on success or negative on failure
*
HG_UTIL_PUBLIC int
hg_request_cancel(hg_request_t *request);
*/
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_request_reset(hg_request_t *request)
{
hg_atomic_set32(&request->completed, HG_UTIL_FALSE);
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_request_complete(hg_request_t *request)
{
hg_atomic_incr32(&request->completed);
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_request_waitall(int count, hg_request_t *request[], unsigned int timeout,
unsigned int *flag)
{
int i;
for (i = 0; i < count; i++)
hg_request_wait(request[i], timeout, flag);
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_request_set_data(hg_request_t *request, void *data)
{
request->data = data;
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE void *
hg_request_get_data(hg_request_t *request)
{
return request->data;
}
#ifdef __cplusplus
}
#endif
#endif /* MERCURY_REQUEST_H */

View File

@@ -0,0 +1,162 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#include "mercury_thread.h"
/*---------------------------------------------------------------------------*/
void
hg_thread_init(hg_thread_t *thread)
{
#ifdef _WIN32
*thread = NULL;
#else
*thread = 0;
#endif
}
/*---------------------------------------------------------------------------*/
int
hg_thread_create(hg_thread_t *thread, hg_thread_func_t f, void *data)
{
#ifdef _WIN32
*thread = CreateThread(NULL, 0, f, data, 0, NULL);
if (*thread == NULL)
return HG_UTIL_FAIL;
#else
if (pthread_create(thread, NULL, f, data))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
void
hg_thread_exit(hg_thread_ret_t ret)
{
#ifdef _WIN32
ExitThread(ret);
#else
pthread_exit(ret);
#endif
}
/*---------------------------------------------------------------------------*/
int
hg_thread_join(hg_thread_t thread)
{
#ifdef _WIN32
WaitForSingleObject(thread, INFINITE);
CloseHandle(thread);
#else
if (pthread_join(thread, NULL))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
int
hg_thread_cancel(hg_thread_t thread)
{
#ifdef _WIN32
WaitForSingleObject(thread, 0);
CloseHandle(thread);
#else
if (pthread_cancel(thread))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
int
hg_thread_yield(void)
{
#ifdef _WIN32
SwitchToThread();
#elif defined(__APPLE__)
pthread_yield_np();
#else
pthread_yield();
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
int
hg_thread_key_create(hg_thread_key_t *key)
{
if (!key)
return HG_UTIL_FAIL;
#ifdef _WIN32
if ((*key = TlsAlloc()) == TLS_OUT_OF_INDEXES)
return HG_UTIL_FAIL;
#else
if (pthread_key_create(key, NULL))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
int
hg_thread_key_delete(hg_thread_key_t key)
{
#ifdef _WIN32
if (!TlsFree(key))
return HG_UTIL_FAIL;
#else
if (pthread_key_delete(key))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
int
hg_thread_getaffinity(hg_thread_t thread, hg_cpu_set_t *cpu_mask)
{
#if defined(_WIN32)
return HG_UTIL_FAIL;
#elif defined(__APPLE__)
(void) thread;
(void) cpu_mask;
return HG_UTIL_FAIL;
#else
if (pthread_getaffinity_np(thread, sizeof(hg_cpu_set_t), cpu_mask))
return HG_UTIL_FAIL;
return HG_UTIL_SUCCESS;
#endif
}
/*---------------------------------------------------------------------------*/
int
hg_thread_setaffinity(hg_thread_t thread, const hg_cpu_set_t *cpu_mask)
{
#if defined(_WIN32)
if (!SetThreadAffinityMask(thread, *cpu_mask))
return HG_UTIL_FAIL;
#elif defined(__APPLE__)
(void) thread;
(void) cpu_mask;
return HG_UTIL_FAIL;
#else
if (pthread_setaffinity_np(thread, sizeof(hg_cpu_set_t), cpu_mask))
return HG_UTIL_FAIL;
return HG_UTIL_SUCCESS;
#endif
}

View File

@@ -0,0 +1,242 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#ifndef MERCURY_THREAD_H
#define MERCURY_THREAD_H
#if !defined(_WIN32) && !defined(_GNU_SOURCE)
# define _GNU_SOURCE
#endif
#include "mercury_util_config.h"
#ifdef _WIN32
# include <windows.h>
typedef HANDLE hg_thread_t;
typedef LPTHREAD_START_ROUTINE hg_thread_func_t;
typedef DWORD hg_thread_ret_t;
# define HG_THREAD_RETURN_TYPE hg_thread_ret_t WINAPI
typedef DWORD hg_thread_key_t;
typedef DWORD_PTR hg_cpu_set_t;
#else
# include <pthread.h>
typedef pthread_t hg_thread_t;
typedef void *(*hg_thread_func_t)(void *);
typedef void *hg_thread_ret_t;
# define HG_THREAD_RETURN_TYPE hg_thread_ret_t
typedef pthread_key_t hg_thread_key_t;
# ifdef __APPLE__
/* Size definition for CPU sets. */
# define HG_CPU_SETSIZE 1024
# define HG_NCPUBITS (8 * sizeof(hg_cpu_mask_t))
/* Type for array elements in 'cpu_set_t'. */
typedef hg_util_uint64_t hg_cpu_mask_t;
typedef struct {
hg_cpu_mask_t bits[HG_CPU_SETSIZE / HG_NCPUBITS];
} hg_cpu_set_t;
# else
typedef cpu_set_t hg_cpu_set_t;
# endif
#endif
#ifdef __cplusplus
extern "C" {
#endif
/**
* Initialize the thread.
*
* \param thread [IN/OUT] pointer to thread object
*/
HG_UTIL_PUBLIC void
hg_thread_init(hg_thread_t *thread);
/**
* Create a new thread for the given function.
*
* \param thread [IN/OUT] pointer to thread object
* \param f [IN] pointer to function
* \param data [IN] pointer to data than be passed to function f
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_thread_create(hg_thread_t *thread, hg_thread_func_t f, void *data);
/**
* Ends the calling thread.
*
* \param ret [IN] exit code for the thread
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC void
hg_thread_exit(hg_thread_ret_t ret);
/**
* Wait for thread completion.
*
* \param thread [IN] thread object
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_thread_join(hg_thread_t thread);
/**
* Terminate the thread.
*
* \param thread [IN] thread object
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_thread_cancel(hg_thread_t thread);
/**
* Yield the processor.
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_thread_yield(void);
/**
* Obtain handle of the calling thread.
*
* \return
*/
static HG_UTIL_INLINE hg_thread_t
hg_thread_self(void);
/**
* Compare thread IDs.
*
* \return Non-zero if equal, zero if not equal
*/
static HG_UTIL_INLINE int
hg_thread_equal(hg_thread_t t1, hg_thread_t t2);
/**
* Create a thread-specific data key visible to all threads in the process.
*
* \param key [OUT] pointer to thread key object
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_thread_key_create(hg_thread_key_t *key);
/**
* Delete a thread-specific data key previously returned by
* hg_thread_key_create().
*
* \param key [IN] thread key object
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_thread_key_delete(hg_thread_key_t key);
/**
* Get value from specified key.
*
* \param key [IN] thread key object
*
* \return Pointer to data associated to the key
*/
static HG_UTIL_INLINE void *
hg_thread_getspecific(hg_thread_key_t key);
/**
* Set value to specified key.
*
* \param key [IN] thread key object
* \param value [IN] pointer to data that will be associated
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_thread_setspecific(hg_thread_key_t key, const void *value);
/**
* Get affinity mask.
*
* \param thread [IN] thread object
* \param cpu_mask [IN/OUT] cpu mask
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_thread_getaffinity(hg_thread_t thread, hg_cpu_set_t *cpu_mask);
/**
* Set affinity mask.
*
* \param thread [IN] thread object
* \param cpu_mask [IN] cpu mask
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_thread_setaffinity(hg_thread_t thread, const hg_cpu_set_t *cpu_mask);
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_thread_t
hg_thread_self(void)
{
#ifdef _WIN32
return GetCurrentThread();
#else
return pthread_self();
#endif
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_equal(hg_thread_t t1, hg_thread_t t2)
{
#ifdef _WIN32
return GetThreadId(t1) == GetThreadId(t2);
#else
return pthread_equal(t1, t2);
#endif
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE void *
hg_thread_getspecific(hg_thread_key_t key)
{
#ifdef _WIN32
return TlsGetValue(key);
#else
return pthread_getspecific(key);
#endif
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_setspecific(hg_thread_key_t key, const void *value)
{
#ifdef _WIN32
if (!TlsSetValue(key, (LPVOID) value))
return HG_UTIL_FAIL;
#else
if (pthread_setspecific(key, value))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
#ifdef __cplusplus
}
#endif
#endif /* MERCURY_THREAD_H */

View File

@@ -0,0 +1,46 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#include "mercury_thread_condition.h"
/*---------------------------------------------------------------------------*/
int
hg_thread_cond_init(hg_thread_cond_t *cond)
{
#ifdef _WIN32
InitializeConditionVariable(cond);
#else
pthread_condattr_t attr;
pthread_condattr_init(&attr);
# if defined(HG_UTIL_HAS_PTHREAD_CONDATTR_SETCLOCK) && \
defined(HG_UTIL_HAS_CLOCK_MONOTONIC)
/* Must set clock ID if using different clock */
pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
# endif
if (pthread_cond_init(cond, &attr))
return HG_UTIL_FAIL;
pthread_condattr_destroy(&attr);
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
int
hg_thread_cond_destroy(hg_thread_cond_t *cond)
{
#ifndef _WIN32
if (pthread_cond_destroy(cond))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}

View File

@@ -0,0 +1,182 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#ifndef MERCURY_THREAD_CONDITION_H
#define MERCURY_THREAD_CONDITION_H
#include "mercury_thread_mutex.h"
#ifdef _WIN32
typedef CONDITION_VARIABLE hg_thread_cond_t;
#else
# if defined(HG_UTIL_HAS_PTHREAD_CONDATTR_SETCLOCK)
# include "mercury_time.h"
# elif defined(HG_UTIL_HAS_SYSTIME_H)
# include <sys/time.h>
# endif
# include <stdlib.h>
typedef pthread_cond_t hg_thread_cond_t;
#endif
#ifdef __cplusplus
extern "C" {
#endif
/**
* Initialize the condition.
*
* \param cond [IN/OUT] pointer to condition object
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_thread_cond_init(hg_thread_cond_t *cond);
/**
* Destroy the condition.
*
* \param cond [IN/OUT] pointer to condition object
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_thread_cond_destroy(hg_thread_cond_t *cond);
/**
* Wake one thread waiting for the condition to change.
*
* \param cond [IN/OUT] pointer to condition object
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_thread_cond_signal(hg_thread_cond_t *cond);
/**
* Wake all the threads waiting for the condition to change.
*
* \param cond [IN/OUT] pointer to condition object
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_thread_cond_broadcast(hg_thread_cond_t *cond);
/**
* Wait for the condition to change.
*
* \param cond [IN/OUT] pointer to condition object
* \param mutex [IN/OUT] pointer to mutex object
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_thread_cond_wait(hg_thread_cond_t *cond, hg_thread_mutex_t *mutex);
/**
* Wait timeout ms for the condition to change.
*
* \param cond [IN/OUT] pointer to condition object
* \param mutex [IN/OUT] pointer to mutex object
* \param timeout [IN] timeout (in milliseconds)
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_thread_cond_timedwait(
hg_thread_cond_t *cond, hg_thread_mutex_t *mutex, unsigned int timeout);
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_cond_signal(hg_thread_cond_t *cond)
{
#ifdef _WIN32
WakeConditionVariable(cond);
#else
if (pthread_cond_signal(cond))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_cond_broadcast(hg_thread_cond_t *cond)
{
#ifdef _WIN32
WakeAllConditionVariable(cond);
#else
if (pthread_cond_broadcast(cond))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_cond_wait(hg_thread_cond_t *cond, hg_thread_mutex_t *mutex)
{
#ifdef _WIN32
if (!SleepConditionVariableCS(cond, mutex, INFINITE))
return HG_UTIL_FAIL;
#else
if (pthread_cond_wait(cond, mutex))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_cond_timedwait(
hg_thread_cond_t *cond, hg_thread_mutex_t *mutex, unsigned int timeout)
{
#ifdef _WIN32
if (!SleepConditionVariableCS(cond, mutex, timeout))
return HG_UTIL_FAIL;
#else
# if defined(HG_UTIL_HAS_PTHREAD_CONDATTR_SETCLOCK)
hg_time_t now;
# elif defined(HG_UTIL_HAS_SYSTIME_H)
struct timeval now;
# endif
struct timespec abs_timeout;
long int abs_timeout_us;
ldiv_t ld;
/* Need to convert timeout (ms) to absolute time */
# if defined(HG_UTIL_HAS_PTHREAD_CONDATTR_SETCLOCK)
if (hg_time_get_current(&now) != HG_UTIL_SUCCESS)
return HG_UTIL_FAIL;
# elif defined(HG_UTIL_HAS_SYSTIME_H)
if (gettimeofday(&now, NULL) != 0)
return HG_UTIL_FAIL;
# endif
abs_timeout_us = now.tv_usec + timeout * 1000L;
/* Get sec / nsec */
ld = ldiv(abs_timeout_us, 1000000L);
abs_timeout.tv_sec = now.tv_sec + ld.quot;
abs_timeout.tv_nsec = ld.rem * 1000L;
if (pthread_cond_timedwait(cond, mutex, &abs_timeout))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
#ifdef __cplusplus
}
#endif
#endif /* MERCURY_THREAD_CONDITION_H */

View File

@@ -0,0 +1,50 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#include "mercury_thread_mutex.h"
/*---------------------------------------------------------------------------*/
int
hg_thread_mutex_init(hg_thread_mutex_t *mutex)
{
#ifdef _WIN32
InitializeCriticalSection(mutex);
#else
pthread_mutexattr_t mutex_attr;
pthread_mutexattr_init(&mutex_attr);
# ifdef HG_UTIL_HAS_PTHREAD_MUTEX_ADAPTIVE_NP
/* Set type to PTHREAD_MUTEX_ADAPTIVE_NP to improve performance */
pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_ADAPTIVE_NP);
# else
pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_DEFAULT);
# endif
if (pthread_mutex_init(mutex, &mutex_attr))
return HG_UTIL_FAIL;
pthread_mutexattr_destroy(&mutex_attr);
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
int
hg_thread_mutex_destroy(hg_thread_mutex_t *mutex)
{
#ifdef _WIN32
DeleteCriticalSection(mutex);
#else
if (pthread_mutex_destroy(mutex))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}

View File

@@ -0,0 +1,127 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#ifndef MERCURY_THREAD_MUTEX_H
#define MERCURY_THREAD_MUTEX_H
#include "mercury_util_config.h"
#ifdef _WIN32
# include <windows.h>
# define HG_THREAD_MUTEX_INITIALIZER NULL
typedef CRITICAL_SECTION hg_thread_mutex_t;
#else
# include <pthread.h>
# define HG_THREAD_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER
typedef pthread_mutex_t hg_thread_mutex_t;
#endif
#ifdef __cplusplus
extern "C" {
#endif
/**
* Initialize the mutex.
*
* \param mutex [IN/OUT] pointer to mutex object
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_thread_mutex_init(hg_thread_mutex_t *mutex);
/**
* Destroy the mutex.
*
* \param mutex [IN/OUT] pointer to mutex object
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_thread_mutex_destroy(hg_thread_mutex_t *mutex);
/**
* Lock the mutex.
*
* \param mutex [IN/OUT] pointer to mutex object
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_thread_mutex_lock(hg_thread_mutex_t *mutex);
/**
* Try locking the mutex.
*
* \param mutex [IN/OUT] pointer to mutex object
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_thread_mutex_try_lock(hg_thread_mutex_t *mutex);
/**
* Unlock the mutex.
*
* \param mutex [IN/OUT] pointer to mutex object
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_thread_mutex_unlock(hg_thread_mutex_t *mutex);
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_mutex_lock(hg_thread_mutex_t *mutex)
{
#ifdef _WIN32
EnterCriticalSection(mutex);
#else
if (pthread_mutex_lock(mutex))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_mutex_try_lock(hg_thread_mutex_t *mutex)
{
#ifdef _WIN32
if (!TryEnterCriticalSection(mutex))
return HG_UTIL_FAIL;
#else
if (pthread_mutex_trylock(mutex))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_mutex_unlock(hg_thread_mutex_t *mutex)
{
#ifdef _WIN32
LeaveCriticalSection(mutex);
#else
if (pthread_mutex_unlock(mutex))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
#ifdef __cplusplus
}
#endif
#endif /* MERCURY_THREAD_MUTEX_H */

View File

@@ -0,0 +1,191 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#include "mercury_thread_pool.h"
#include "mercury_util_error.h"
#include <stdlib.h>
/****************/
/* Local Macros */
/****************/
/************************************/
/* Local Type and Struct Definition */
/************************************/
struct hg_thread_pool_private {
struct hg_thread_pool pool;
unsigned int thread_count;
hg_thread_t *threads;
};
/********************/
/* Local Prototypes */
/********************/
/**
* Worker thread run by the thread pool
*/
static HG_THREAD_RETURN_TYPE
hg_thread_pool_worker(void *args);
/*******************/
/* Local Variables */
/*******************/
/*---------------------------------------------------------------------------*/
static HG_THREAD_RETURN_TYPE
hg_thread_pool_worker(void *args)
{
hg_thread_ret_t ret = 0;
hg_thread_pool_t *pool = (hg_thread_pool_t *) args;
struct hg_thread_work *work;
while (1) {
hg_thread_mutex_lock(&pool->mutex);
/* If not shutting down and nothing to do, worker sleeps */
while (!pool->shutdown && HG_QUEUE_IS_EMPTY(&pool->queue)) {
int rc;
pool->sleeping_worker_count++;
rc = hg_thread_cond_wait(&pool->cond, &pool->mutex);
HG_UTIL_CHECK_ERROR_NORET(rc != HG_UTIL_SUCCESS, unlock,
"Thread cannot wait on condition variable");
pool->sleeping_worker_count--;
}
if (pool->shutdown && HG_QUEUE_IS_EMPTY(&pool->queue))
goto unlock;
/* Grab our task */
work = HG_QUEUE_FIRST(&pool->queue);
HG_QUEUE_POP_HEAD(&pool->queue, entry);
/* Unlock */
hg_thread_mutex_unlock(&pool->mutex);
/* Get to work */
(*work->func)(work->args);
}
unlock:
hg_thread_mutex_unlock(&pool->mutex);
return ret;
}
/*---------------------------------------------------------------------------*/
int
hg_thread_pool_init(unsigned int thread_count, hg_thread_pool_t **pool_ptr)
{
int ret = HG_UTIL_SUCCESS, rc;
struct hg_thread_pool_private *priv_pool = NULL;
unsigned int i;
HG_UTIL_CHECK_ERROR(
pool_ptr == NULL, error, ret, HG_UTIL_FAIL, "NULL pointer");
priv_pool = (struct hg_thread_pool_private *) malloc(
sizeof(struct hg_thread_pool_private));
HG_UTIL_CHECK_ERROR(priv_pool == NULL, error, ret, HG_UTIL_FAIL,
"Could not allocate thread pool");
priv_pool->pool.sleeping_worker_count = 0;
priv_pool->thread_count = thread_count;
priv_pool->threads = NULL;
HG_QUEUE_INIT(&priv_pool->pool.queue);
priv_pool->pool.shutdown = 0;
rc = hg_thread_mutex_init(&priv_pool->pool.mutex);
HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, error, ret, HG_UTIL_FAIL,
"Could not initialize mutex");
rc = hg_thread_cond_init(&priv_pool->pool.cond);
HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, error, ret, HG_UTIL_FAIL,
"Could not initialize thread condition");
priv_pool->threads =
(hg_thread_t *) malloc(thread_count * sizeof(hg_thread_t));
HG_UTIL_CHECK_ERROR(!priv_pool->threads, error, ret, HG_UTIL_FAIL,
"Could not allocate thread pool array");
/* Start worker threads */
for (i = 0; i < thread_count; i++) {
rc = hg_thread_create(
&priv_pool->threads[i], hg_thread_pool_worker, (void *) priv_pool);
HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, error, ret, HG_UTIL_FAIL,
"Could not create thread");
}
*pool_ptr = (struct hg_thread_pool *) priv_pool;
return ret;
error:
if (priv_pool)
hg_thread_pool_destroy((struct hg_thread_pool *) priv_pool);
return ret;
}
/*---------------------------------------------------------------------------*/
int
hg_thread_pool_destroy(hg_thread_pool_t *pool)
{
struct hg_thread_pool_private *priv_pool =
(struct hg_thread_pool_private *) pool;
int ret = HG_UTIL_SUCCESS, rc;
unsigned int i;
if (!priv_pool)
goto done;
if (priv_pool->threads) {
hg_thread_mutex_lock(&priv_pool->pool.mutex);
priv_pool->pool.shutdown = 1;
rc = hg_thread_cond_broadcast(&priv_pool->pool.cond);
HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, error, ret, HG_UTIL_FAIL,
"Could not broadcast condition signal");
hg_thread_mutex_unlock(&priv_pool->pool.mutex);
for (i = 0; i < priv_pool->thread_count; i++) {
rc = hg_thread_join(priv_pool->threads[i]);
HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL,
"Could not join thread");
}
}
rc = hg_thread_mutex_destroy(&priv_pool->pool.mutex);
HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL,
"Could not destroy mutex");
rc = hg_thread_cond_destroy(&priv_pool->pool.cond);
HG_UTIL_CHECK_ERROR(rc != HG_UTIL_SUCCESS, done, ret, HG_UTIL_FAIL,
"Could not destroy thread condition");
free(priv_pool->threads);
free(priv_pool);
done:
return ret;
error:
hg_thread_mutex_unlock(&priv_pool->pool.mutex);
return ret;
}

View File

@@ -0,0 +1,124 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#ifndef MERCURY_THREAD_POOL_H
#define MERCURY_THREAD_POOL_H
#include "mercury_queue.h"
#include "mercury_thread.h"
#include "mercury_thread_condition.h"
/*************************************/
/* Public Type and Struct Definition */
/*************************************/
typedef struct hg_thread_pool hg_thread_pool_t;
struct hg_thread_pool {
unsigned int sleeping_worker_count;
HG_QUEUE_HEAD(hg_thread_work) queue;
int shutdown;
hg_thread_mutex_t mutex;
hg_thread_cond_t cond;
};
struct hg_thread_work {
hg_thread_func_t func;
void *args;
HG_QUEUE_ENTRY(hg_thread_work) entry; /* Internal */
};
/*****************/
/* Public Macros */
/*****************/
/*********************/
/* Public Prototypes */
/*********************/
#ifdef __cplusplus
extern "C" {
#endif
/**
* Initialize the thread pool.
*
* \param thread_count [IN] number of threads that will be created at
* initialization
* \param pool [OUT] pointer to pool object
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_thread_pool_init(unsigned int thread_count, hg_thread_pool_t **pool);
/**
* Destroy the thread pool.
*
* \param pool [IN/OUT] pointer to pool object
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_thread_pool_destroy(hg_thread_pool_t *pool);
/**
* Post work to the pool. Note that the operation may be queued depending on
* the number of threads and number of tasks already running.
*
* \param pool [IN/OUT] pointer to pool object
* \param work [IN] pointer to work struct
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_thread_pool_post(hg_thread_pool_t *pool, struct hg_thread_work *work);
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_pool_post(hg_thread_pool_t *pool, struct hg_thread_work *work)
{
int ret = HG_UTIL_SUCCESS;
if (!pool || !work)
return HG_UTIL_FAIL;
if (!work->func)
return HG_UTIL_FAIL;
hg_thread_mutex_lock(&pool->mutex);
/* Are we shutting down ? */
if (pool->shutdown) {
ret = HG_UTIL_FAIL;
goto unlock;
}
/* Add task to task queue */
HG_QUEUE_PUSH_TAIL(&pool->queue, work, entry);
/* Wake up sleeping worker */
if (pool->sleeping_worker_count &&
(hg_thread_cond_signal(&pool->cond) != HG_UTIL_SUCCESS))
ret = HG_UTIL_FAIL;
unlock:
hg_thread_mutex_unlock(&pool->mutex);
return ret;
}
#ifdef __cplusplus
}
#endif
#endif /* MERCURY_THREAD_POOL_H */

View File

@@ -0,0 +1,77 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
/* Copyright (C) 2017 Intel Corporation
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted for any purpose (including commercial purposes)
* provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions, and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions, and the following disclaimer in the
* documentation and/or materials provided with the distribution.
*
* 3. In addition, redistributions of modified forms of the source or binary
* code must carry prominent notices stating that the original code was
* changed and the date of the change.
*
* 4. All publications or advertising materials mentioning features or use of
* this software are asked, but not required, to acknowledge that it was
* developed by Intel Corporation and credit the contributors.
*
* 5. Neither the name of Intel Corporation, nor the name of any Contributor
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "mercury_thread_rwlock.h"
/*---------------------------------------------------------------------------*/
int
hg_thread_rwlock_init(hg_thread_rwlock_t *rwlock)
{
#ifdef _WIN32
InitializeSRWLock(rwlock);
#else
if (pthread_rwlock_init(rwlock, NULL))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
int
hg_thread_rwlock_destroy(hg_thread_rwlock_t *rwlock)
{
#ifdef _WIN32
/* nothing to do */
#else
if (pthread_rwlock_destroy(rwlock))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}

View File

@@ -0,0 +1,236 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
/* Copyright (C) 2017 Intel Corporation
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted for any purpose (including commercial purposes)
* provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions, and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions, and the following disclaimer in the
* documentation and/or materials provided with the distribution.
*
* 3. In addition, redistributions of modified forms of the source or binary
* code must carry prominent notices stating that the original code was
* changed and the date of the change.
*
* 4. All publications or advertising materials mentioning features or use of
* this software are asked, but not required, to acknowledge that it was
* developed by Intel Corporation and credit the contributors.
*
* 5. Neither the name of Intel Corporation, nor the name of any Contributor
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef MERCURY_THREAD_RWLOCK_H
#define MERCURY_THREAD_RWLOCK_H
#include "mercury_util_config.h"
#ifdef _WIN32
# include <windows.h>
typedef PSRWLOCK hg_thread_rwlock_t;
#else
# include <pthread.h>
typedef pthread_rwlock_t hg_thread_rwlock_t;
#endif
#ifdef __cplusplus
extern "C" {
#endif
/**
* Initialize the rwlock.
*
* \param rwlock [IN/OUT] pointer to rwlock object
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_thread_rwlock_init(hg_thread_rwlock_t *rwlock);
/**
* Destroy the rwlock.
*
* \param rwlock [IN/OUT] pointer to rwlock object
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_thread_rwlock_destroy(hg_thread_rwlock_t *rwlock);
/**
* Take a read lock for the rwlock.
*
* \param rwlock [IN/OUT] pointer to rwlock object
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_thread_rwlock_rdlock(hg_thread_rwlock_t *rwlock);
/**
* Try to take a read lock for the rwlock.
*
* \param rwlock [IN/OUT] pointer to rwlock object
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_thread_rwlock_try_rdlock(hg_thread_rwlock_t *rwlock);
/**
* Release the read lock of the rwlock.
*
* \param rwlock [IN/OUT] pointer to rwlock object
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_thread_rwlock_release_rdlock(hg_thread_rwlock_t *rwlock);
/**
* Take a write lock for the rwlock.
*
* \param rwlock [IN/OUT] pointer to rwlock object
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_thread_rwlock_wrlock(hg_thread_rwlock_t *rwlock);
/**
* Try to take a write lock for the rwlock.
*
* \param rwlock [IN/OUT] pointer to rwlock object
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_thread_rwlock_try_wrlock(hg_thread_rwlock_t *rwlock);
/**
* Release the write lock of the rwlock.
*
* \param rwlock [IN/OUT] pointer to rwlock object
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_thread_rwlock_release_wrlock(hg_thread_rwlock_t *rwlock);
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_rwlock_rdlock(hg_thread_rwlock_t *rwlock)
{
#ifdef _WIN32
AcquireSRWLockShared(rwlock);
#else
if (pthread_rwlock_rdlock(rwlock))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_rwlock_try_rdlock(hg_thread_rwlock_t *rwlock)
{
#ifdef _WIN32
if (TryAcquireSRWLockShared(rwlock) == 0)
return HG_UTIL_FAIL;
#else
if (pthread_rwlock_tryrdlock(rwlock))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_rwlock_release_rdlock(hg_thread_rwlock_t *rwlock)
{
#ifdef _WIN32
ReleaseSRWLockShared(rwlock);
#else
if (pthread_rwlock_unlock(rwlock))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_rwlock_wrlock(hg_thread_rwlock_t *rwlock)
{
#ifdef _WIN32
ReleaseSRWLockExclusive(rwlock);
#else
if (pthread_rwlock_wrlock(rwlock))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_rwlock_try_wrlock(hg_thread_rwlock_t *rwlock)
{
#ifdef _WIN32
if (TryAcquireSRWLockExclusive(rwlock) == 0)
return HG_UTIL_FAIL;
#else
if (pthread_rwlock_trywrlock(rwlock))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_rwlock_release_wrlock(hg_thread_rwlock_t *rwlock)
{
#ifdef _WIN32
ReleaseSRWLockExclusive(rwlock);
#else
if (pthread_rwlock_unlock(rwlock))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
#ifdef __cplusplus
}
#endif
#endif /* MERCURY_THREAD_RWLOCK_H */

View File

@@ -0,0 +1,47 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#include "mercury_thread_spin.h"
/*---------------------------------------------------------------------------*/
int
hg_thread_spin_init(hg_thread_spin_t *lock)
{
#if defined(_WIN32)
*lock = 0;
return HG_UTIL_SUCCESS;
#elif defined(HG_UTIL_HAS_PTHREAD_SPINLOCK_T)
if (pthread_spin_init(lock, 0))
return HG_UTIL_FAIL;
return HG_UTIL_SUCCESS;
#else
return hg_thread_mutex_init(lock);
#endif
}
/*---------------------------------------------------------------------------*/
int
hg_thread_spin_destroy(hg_thread_spin_t *lock)
{
#if defined(_WIN32)
(void) lock;
return HG_UTIL_SUCCESS;
#elif defined(HG_UTIL_HAS_PTHREAD_SPINLOCK_T)
if (pthread_spin_destroy(lock))
return HG_UTIL_FAIL;
return HG_UTIL_SUCCESS;
#else
return hg_thread_mutex_destroy(lock);
#endif
}

View File

@@ -0,0 +1,146 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#ifndef MERCURY_THREAD_SPIN_H
#define MERCURY_THREAD_SPIN_H
#include "mercury_util_config.h"
#if defined(_WIN32)
# include <windows.h>
typedef volatile LONG hg_thread_spin_t;
#elif defined(HG_UTIL_HAS_PTHREAD_SPINLOCK_T)
# include <pthread.h>
typedef pthread_spinlock_t hg_thread_spin_t;
#else
/* Default to hg_thread_mutex_t if pthread_spinlock_t is not supported */
# include "mercury_thread_mutex.h"
typedef hg_thread_mutex_t hg_thread_spin_t;
#endif
#ifdef __cplusplus
extern "C" {
#endif
/**
* Initialize the spin lock.
*
* \param lock [IN/OUT] pointer to lock object
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_thread_spin_init(hg_thread_spin_t *lock);
/**
* Destroy the spin lock.
*
* \param lock [IN/OUT] pointer to lock object
*
* \return Non-negative on success or negative on failure
*/
HG_UTIL_PUBLIC int
hg_thread_spin_destroy(hg_thread_spin_t *lock);
/**
* Lock the spin lock.
*
* \param lock [IN/OUT] pointer to lock object
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_thread_spin_lock(hg_thread_spin_t *lock);
/**
* Try locking the spin lock.
*
* \param mutex [IN/OUT] pointer to lock object
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_thread_spin_try_lock(hg_thread_spin_t *lock);
/**
* Unlock the spin lock.
*
* \param mutex [IN/OUT] pointer to lock object
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_thread_spin_unlock(hg_thread_spin_t *lock);
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_spin_lock(hg_thread_spin_t *lock)
{
#if defined(_WIN32)
while (InterlockedExchange(lock, EBUSY)) {
/* Don't lock while waiting */
while (*lock) {
YieldProcessor();
/* Compiler barrier. Prevent caching of *lock */
MemoryBarrier();
}
}
return HG_UTIL_SUCCESS;
#elif defined(HG_UTIL_HAS_PTHREAD_SPINLOCK_T)
if (pthread_spin_lock(lock))
return HG_UTIL_FAIL;
return HG_UTIL_SUCCESS;
#else
return hg_thread_mutex_lock(lock);
#endif
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_spin_try_lock(hg_thread_spin_t *lock)
{
#if defined(_WIN32)
return InterlockedExchange(lock, EBUSY);
#elif defined(HG_UTIL_HAS_PTHREAD_SPINLOCK_T)
if (pthread_spin_trylock(lock))
return HG_UTIL_FAIL;
return HG_UTIL_SUCCESS;
#else
return hg_thread_mutex_try_lock(lock);
#endif
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_thread_spin_unlock(hg_thread_spin_t *lock)
{
#if defined(_WIN32)
/* Compiler barrier. The store below acts with release semantics */
MemoryBarrier();
*lock = 0;
return HG_UTIL_SUCCESS;
#elif defined(HG_UTIL_HAS_PTHREAD_SPINLOCK_T)
if (pthread_spin_unlock(lock))
return HG_UTIL_FAIL;
return HG_UTIL_SUCCESS;
#else
return hg_thread_mutex_unlock(lock);
#endif
}
#ifdef __cplusplus
}
#endif
#endif /* MERCURY_THREAD_SPIN_H */

402
src/mercury/mercury_time.h Normal file
View File

@@ -0,0 +1,402 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#ifndef MERCURY_TIME_H
#define MERCURY_TIME_H
#include "mercury_util_config.h"
#if defined(_WIN32)
# include <windows.h>
#elif defined(HG_UTIL_HAS_CLOCK_MONOTONIC)
# if defined(HG_UTIL_HAS_TIME_H) && defined(HG_UTIL_HAS_CLOCK_GETTIME)
# include <time.h>
# elif defined(__APPLE__) && defined(HG_UTIL_HAS_SYSTIME_H)
# include <mach/mach_time.h>
# include <sys/time.h>
# else
# error "Not supported on this platform."
# endif
#else
# include <stdio.h>
# include <unistd.h>
# if defined(HG_UTIL_HAS_SYSTIME_H)
# include <sys/time.h>
# else
# error "Not supported on this platform."
# endif
#endif
/*************************************/
/* Public Type and Struct Definition */
/*************************************/
typedef struct hg_time hg_time_t;
struct hg_time {
long tv_sec;
long tv_usec;
};
/*****************/
/* Public Macros */
/*****************/
/*********************/
/* Public Prototypes */
/*********************/
#ifdef __cplusplus
extern "C" {
#endif
/**
* Get an elapsed time on the calling processor.
*
* \param tv [OUT] pointer to returned time structure
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_time_get_current(hg_time_t *tv);
/**
* Convert hg_time_t to double.
*
* \param tv [IN] time structure
*
* \return Converted time in seconds
*/
static HG_UTIL_INLINE double
hg_time_to_double(hg_time_t tv);
/**
* Convert double to hg_time_t.
*
* \param d [IN] time in seconds
*
* \return Converted time structure
*/
static HG_UTIL_INLINE hg_time_t
hg_time_from_double(double d);
/**
* Compare time values.
*
* \param in1 [IN] time structure
* \param in2 [IN] time structure
*
* \return 1 if in1 < in2, 0 otherwise
*/
static HG_UTIL_INLINE int
hg_time_less(hg_time_t in1, hg_time_t in2);
/**
* Add time values.
*
* \param in1 [IN] time structure
* \param in2 [IN] time structure
*
* \return Summed time structure
*/
static HG_UTIL_INLINE hg_time_t
hg_time_add(hg_time_t in1, hg_time_t in2);
/**
* Subtract time values.
*
* \param in1 [IN] time structure
* \param in2 [IN] time structure
*
* \return Subtracted time structure
*/
static HG_UTIL_INLINE hg_time_t
hg_time_subtract(hg_time_t in1, hg_time_t in2);
/**
* Sleep until the time specified in rqt has elapsed.
*
* \param reqt [IN] time structure
*
* \return Non-negative on success or negative on failure
*/
static HG_UTIL_INLINE int
hg_time_sleep(const hg_time_t rqt);
/**
* Get a string containing current time/date stamp.
*
* \return Valid string or NULL on failure
*/
static HG_UTIL_INLINE char *
hg_time_stamp(void);
/*---------------------------------------------------------------------------*/
#ifdef _WIN32
static HG_UTIL_INLINE LARGE_INTEGER
get_FILETIME_offset(void)
{
SYSTEMTIME s;
FILETIME f;
LARGE_INTEGER t;
s.wYear = 1970;
s.wMonth = 1;
s.wDay = 1;
s.wHour = 0;
s.wMinute = 0;
s.wSecond = 0;
s.wMilliseconds = 0;
SystemTimeToFileTime(&s, &f);
t.QuadPart = f.dwHighDateTime;
t.QuadPart <<= 32;
t.QuadPart |= f.dwLowDateTime;
return t;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_time_get_current(hg_time_t *tv)
{
LARGE_INTEGER t;
FILETIME f;
double t_usec;
static LARGE_INTEGER offset;
static double freq_to_usec;
static int initialized = 0;
static BOOL use_perf_counter = 0;
if (!tv)
return HG_UTIL_FAIL;
if (!initialized) {
LARGE_INTEGER perf_freq;
initialized = 1;
use_perf_counter = QueryPerformanceFrequency(&perf_freq);
if (use_perf_counter) {
QueryPerformanceCounter(&offset);
freq_to_usec = (double) perf_freq.QuadPart / 1000000.;
} else {
offset = get_FILETIME_offset();
freq_to_usec = 10.;
}
}
if (use_perf_counter) {
QueryPerformanceCounter(&t);
} else {
GetSystemTimeAsFileTime(&f);
t.QuadPart = f.dwHighDateTime;
t.QuadPart <<= 32;
t.QuadPart |= f.dwLowDateTime;
}
t.QuadPart -= offset.QuadPart;
t_usec = (double) t.QuadPart / freq_to_usec;
t.QuadPart = t_usec;
tv->tv_sec = t.QuadPart / 1000000;
tv->tv_usec = t.QuadPart % 1000000;
return HG_UTIL_SUCCESS;
}
#elif defined(HG_UTIL_HAS_CLOCK_MONOTONIC)
/*---------------------------------------------------------------------------*/
# if defined(HG_UTIL_HAS_TIME_H) && defined(HG_UTIL_HAS_CLOCK_GETTIME)
static HG_UTIL_INLINE int
hg_time_get_current(hg_time_t *tv)
{
struct timespec tp = {0, 0};
/* NB. CLOCK_MONOTONIC_RAW is not explicitly supported in the vdso */
clockid_t clock_id = CLOCK_MONOTONIC;
if (!tv)
return HG_UTIL_FAIL;
clock_gettime(clock_id, &tp);
tv->tv_sec = tp.tv_sec;
tv->tv_usec = tp.tv_nsec / 1000;
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
# elif defined(__APPLE__) && defined(HG_UTIL_HAS_SYSTIME_H)
static HG_UTIL_INLINE int
hg_time_get_current(hg_time_t *tv)
{
static uint64_t monotonic_timebase_factor = 0;
uint64_t monotonic_nsec;
if (!tv)
return HG_UTIL_FAIL;
if (monotonic_timebase_factor == 0) {
mach_timebase_info_data_t timebase_info;
(void) mach_timebase_info(&timebase_info);
monotonic_timebase_factor = timebase_info.numer / timebase_info.denom;
}
monotonic_nsec = (mach_absolute_time() * monotonic_timebase_factor);
tv->tv_sec = (long) (monotonic_nsec / 1000000000);
tv->tv_usec = (long) ((monotonic_nsec - (uint64_t) tv->tv_sec) / 1000);
return HG_UTIL_SUCCESS;
}
# endif
#else
/*---------------------------------------------------------------------------*/
# if defined(HG_UTIL_HAS_SYSTIME_H)
static HG_UTIL_INLINE int
hg_time_get_current(hg_time_t *tv)
{
if (!tv)
return HG_UTIL_FAIL;
gettimeofday((struct timeval *) tv, NULL);
return HG_UTIL_SUCCESS;
}
# endif
#endif
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE double
hg_time_to_double(hg_time_t tv)
{
return (double) tv.tv_sec + (double) (tv.tv_usec) * 0.000001;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_time_t
hg_time_from_double(double d)
{
hg_time_t tv;
tv.tv_sec = (long) d;
tv.tv_usec = (long) ((d - (double) (tv.tv_sec)) * 1000000);
return tv;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_time_less(hg_time_t in1, hg_time_t in2)
{
return ((in1.tv_sec < in2.tv_sec) ||
((in1.tv_sec == in2.tv_sec) && (in1.tv_usec < in2.tv_usec)));
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_time_t
hg_time_add(hg_time_t in1, hg_time_t in2)
{
hg_time_t out;
out.tv_sec = in1.tv_sec + in2.tv_sec;
out.tv_usec = in1.tv_usec + in2.tv_usec;
if (out.tv_usec > 1000000) {
out.tv_usec -= 1000000;
out.tv_sec += 1;
}
return out;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE hg_time_t
hg_time_subtract(hg_time_t in1, hg_time_t in2)
{
hg_time_t out;
out.tv_sec = in1.tv_sec - in2.tv_sec;
out.tv_usec = in1.tv_usec - in2.tv_usec;
if (out.tv_usec < 0) {
out.tv_usec += 1000000;
out.tv_sec -= 1;
}
return out;
}
/*---------------------------------------------------------------------------*/
static HG_UTIL_INLINE int
hg_time_sleep(const hg_time_t rqt)
{
#ifdef _WIN32
DWORD dwMilliseconds = (DWORD)(hg_time_to_double(rqt) / 1000);
Sleep(dwMilliseconds);
#elif defined(HG_UTIL_HAS_CLOCK_MONOTONIC)
struct timespec rqtp;
rqtp.tv_sec = rqt.tv_sec;
rqtp.tv_nsec = rqt.tv_usec * 1000;
if (nanosleep(&rqtp, NULL))
return HG_UTIL_FAIL;
#else
useconds_t usec =
(useconds_t) rqt.tv_sec * 1000000 + (useconds_t) rqt.tv_usec;
if (usleep(usec))
return HG_UTIL_FAIL;
#endif
return HG_UTIL_SUCCESS;
}
/*---------------------------------------------------------------------------*/
#define HG_UTIL_STAMP_MAX 128
static HG_UTIL_INLINE char *
hg_time_stamp(void)
{
static char buf[HG_UTIL_STAMP_MAX] = {'\0'};
#if defined(_WIN32)
/* TODO not implemented */
#elif defined(HG_UTIL_HAS_CLOCK_MONOTONIC)
struct tm *local_time;
time_t t;
t = time(NULL);
local_time = localtime(&t);
if (local_time == NULL)
return NULL;
if (strftime(buf, HG_UTIL_STAMP_MAX, "%a, %d %b %Y %T %Z", local_time) == 0)
return NULL;
#else
struct timeval tv;
struct timezone tz;
unsigned long days, hours, minutes, seconds;
gettimeofday(&tv, &tz);
days = (unsigned long) tv.tv_sec / (3600 * 24);
hours = ((unsigned long) tv.tv_sec - days * 24 * 3600) / 3600;
minutes =
((unsigned long) tv.tv_sec - days * 24 * 3600 - hours * 3600) / 60;
seconds = (unsigned long) tv.tv_sec - days * 24 * 3600 - hours * 3600 -
minutes * 60;
hours -= (unsigned long) tz.tz_minuteswest / 60;
snprintf(buf, HG_UTIL_STAMP_MAX, "%02lu:%02lu:%02lu (GMT-%d)", hours,
minutes, seconds, tz.tz_minuteswest / 60);
#endif
return buf;
}
#ifdef __cplusplus
}
#endif
#endif /* MERCURY_TIME_H */

View File

@@ -0,0 +1,141 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
/* Generated file. Only edit mercury_util_config.h.in. */
#ifndef MERCURY_UTIL_CONFIG_H
#define MERCURY_UTIL_CONFIG_H
/*************************************/
/* Public Type and Struct Definition */
/*************************************/
/* Type definitions */
#ifdef _WIN32
typedef signed __int64 hg_util_int64_t;
typedef signed __int32 hg_util_int32_t;
typedef signed __int16 hg_util_int16_t;
typedef signed __int8 hg_util_int8_t;
typedef unsigned __int64 hg_util_uint64_t;
typedef unsigned __int32 hg_util_uint32_t;
typedef unsigned __int16 hg_util_uint16_t;
typedef unsigned __int8 hg_util_uint8_t;
#else
# include <stddef.h>
# include <stdint.h>
typedef int64_t hg_util_int64_t;
typedef int32_t hg_util_int32_t;
typedef int16_t hg_util_int16_t;
typedef int8_t hg_util_int8_t;
typedef uint64_t hg_util_uint64_t;
typedef uint32_t hg_util_uint32_t;
typedef uint16_t hg_util_uint16_t;
typedef uint8_t hg_util_uint8_t;
#endif
typedef hg_util_uint8_t hg_util_bool_t;
typedef hg_util_uint64_t hg_util_ptr_t;
/* True / false */
#define HG_UTIL_TRUE 1
#define HG_UTIL_FALSE 0
/* Return codes */
#define HG_UTIL_SUCCESS 0
#define HG_UTIL_FAIL -1
/*****************/
/* Public Macros */
/*****************/
/* Visibility of symbols */
#if defined(_WIN32)
# define HG_UTIL_ABI_IMPORT __declspec(dllimport)
# define HG_UTIL_ABI_EXPORT __declspec(dllexport)
# define HG_UTIL_ABI_HIDDEN
#elif defined(__GNUC__) && (__GNUC__ >= 4)
# define HG_UTIL_ABI_IMPORT __attribute__((visibility("default")))
# define HG_UTIL_ABI_EXPORT __attribute__((visibility("default")))
# define HG_UTIL_ABI_HIDDEN __attribute__((visibility("hidden")))
#else
# define HG_UTIL_ABI_IMPORT
# define HG_UTIL_ABI_EXPORT
# define HG_UTIL_ABI_HIDDEN
#endif
/* Inline macro */
#ifdef _WIN32
# define HG_UTIL_INLINE __inline
#else
# define HG_UTIL_INLINE __inline__
#endif
/* Shared libraries */
#define HG_UTIL_BUILD_SHARED_LIBS
#ifdef HG_UTIL_BUILD_SHARED_LIBS
# ifdef mercury_util_EXPORTS
# define HG_UTIL_PUBLIC HG_UTIL_ABI_EXPORT
# else
# define HG_UTIL_PUBLIC HG_UTIL_ABI_IMPORT
# endif
# define HG_UTIL_PRIVATE HG_UTIL_ABI_HIDDEN
#else
# define HG_UTIL_PUBLIC
# define HG_UTIL_PRIVATE
#endif
/* Define if has 'clock_gettime()' */
#define HG_UTIL_HAS_CLOCK_GETTIME
/* Define if has CLOCK_MONOTONIC */
/* #undef HG_UTIL_HAS_CLOCK_MONOTONIC */
/* Define if has eventfd_t type */
#define HG_UTIL_HAS_EVENTFD_T
/* Define if has colored output */
/* #undef HG_UTIL_HAS_LOG_COLOR */
/* Define if has <opa_primitives.h> */
/* #undef HG_UTIL_HAS_OPA_PRIMITIVES_H */
/* Define if has 'pthread_condattr_setclock()' */
#define HG_UTIL_HAS_PTHREAD_CONDATTR_SETCLOCK
/* Define if has PTHREAD_MUTEX_ADAPTIVE_NP */
#define HG_UTIL_HAS_PTHREAD_MUTEX_ADAPTIVE_NP
/* Define if has pthread_spinlock_t type */
#define HG_UTIL_HAS_PTHREAD_SPINLOCK_T
/* Define if has <stdatomic.h> */
#define HG_UTIL_HAS_STDATOMIC_H
/* Define type size of atomic_long */
#define HG_UTIL_ATOMIC_LONG_WIDTH 8
/* Define if has <sys/epoll.h> */
#define HG_UTIL_HAS_SYSEPOLL_H
/* Define if has <sys/event.h> */
/* #undef HG_UTIL_HAS_SYSEVENT_H */
/* Define if has <sys/eventfd.h> */
#define HG_UTIL_HAS_SYSEVENTFD_H
/* Define if has <sys/time.h> */
#define HG_UTIL_HAS_SYSTIME_H
/* Define if has <time.h> */
#define HG_UTIL_HAS_TIME_H
/* Define if has verbose error */
#define HG_UTIL_HAS_VERBOSE_ERROR
#endif /* MERCURY_UTIL_CONFIG_H */

View File

@@ -0,0 +1,20 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#include "mercury_util_error.h"
/*******************/
/* Local Variables */
/*******************/
/* Default error log mask */
#ifdef HG_UTIL_HAS_VERBOSE_ERROR
unsigned int HG_UTIL_LOG_MASK = HG_LOG_TYPE_ERROR | HG_LOG_TYPE_WARNING;
#endif

View File

@@ -0,0 +1,104 @@
/*
* Copyright (C) 2013-2019 Argonne National Laboratory, Department of Energy,
* UChicago Argonne, LLC and The HDF Group.
* All rights reserved.
*
* The full copyright notice, including terms governing use, modification,
* and redistribution, is contained in the COPYING file that can be
* found at the root of the source code distribution tree.
*/
#ifndef MERCURY_UTIL_ERROR_H
#define MERCURY_UTIL_ERROR_H
#include "mercury_util_config.h"
/* Default error macro */
#ifdef HG_UTIL_HAS_VERBOSE_ERROR
# include <mercury/mercury_log.h>
# define HG_UTIL_LOG_MASK hg_util_log_mask
/* Log mask will be initialized in init routine */
extern HG_UTIL_PRIVATE unsigned int HG_UTIL_LOG_MASK;
# define HG_UTIL_LOG_MODULE_NAME "HG Util"
# define HG_UTIL_LOG_ERROR(...) \
do { \
if (HG_UTIL_LOG_MASK & HG_LOG_TYPE_ERROR) \
HG_LOG_WRITE_ERROR(HG_UTIL_LOG_MODULE_NAME, __VA_ARGS__); \
} while (0)
# ifdef HG_UTIL_HAS_DEBUG
# define HG_UTIL_LOG_DEBUG(...) \
do { \
if (HG_UTIL_LOG_MASK & HG_LOG_TYPE_DEBUG) \
HG_LOG_WRITE_DEBUG(HG_UTIL_LOG_MODULE_NAME, __VA_ARGS__); \
} while (0)
# else
# define HG_UTIL_LOG_DEBUG(...) (void) 0
# endif
# define HG_UTIL_LOG_WARNING(...) \
do { \
if (HG_UTIL_LOG_MASK & HG_LOG_TYPE_WARNING) \
HG_LOG_WRITE_WARNING(HG_UTIL_LOG_MODULE_NAME, __VA_ARGS__); \
} while (0)
#else
# define HG_UTIL_LOG_ERROR(...) (void) 0
# define HG_UTIL_LOG_DEBUG(...) (void) 0
# define HG_UTIL_LOG_WARNING(...) (void) 0
#endif
/* Branch predictor hints */
#ifndef _WIN32
# define likely(x) __builtin_expect(!!(x), 1)
# define unlikely(x) __builtin_expect(!!(x), 0)
#else
# define likely(x) (x)
# define unlikely(x) (x)
#endif
/* Error macros */
#define HG_UTIL_GOTO_DONE(label, ret, ret_val) \
do { \
ret = ret_val; \
goto label; \
} while (0)
#define HG_UTIL_GOTO_ERROR(label, ret, err_val, ...) \
do { \
HG_UTIL_LOG_ERROR(__VA_ARGS__); \
ret = err_val; \
goto label; \
} while (0)
/* Check for cond, set ret to err_val and goto label */
#define HG_UTIL_CHECK_ERROR(cond, label, ret, err_val, ...) \
do { \
if (unlikely(cond)) { \
HG_UTIL_LOG_ERROR(__VA_ARGS__); \
ret = err_val; \
goto label; \
} \
} while (0)
#define HG_UTIL_CHECK_ERROR_NORET(cond, label, ...) \
do { \
if (unlikely(cond)) { \
HG_UTIL_LOG_ERROR(__VA_ARGS__); \
goto label; \
} \
} while (0)
#define HG_UTIL_CHECK_ERROR_DONE(cond, ...) \
do { \
if (unlikely(cond)) { \
HG_UTIL_LOG_ERROR(__VA_ARGS__); \
} \
} while (0)
/* Check for cond and print warning */
#define HG_UTIL_CHECK_WARNING(cond, ...) \
do { \
if (unlikely(cond)) { \
HG_UTIL_LOG_WARNING(__VA_ARGS__); \
} \
} while (0)
#endif /* MERCURY_UTIL_ERROR_H */

View File

@@ -3268,7 +3268,895 @@ error:
#undef SPLITTER_TEST_FAULT
/*****************************************************************************
*
* Function setup_rand()
*
* Purpose: Use gettimeofday() to obtain a seed for rand(), print the
* seed to stdout, and then pass it to srand().
*
* This is a version of the same routine in
* testpar/t_cache.c modified for use in serial tests.
*
* Return: void.
*
* Programmer: JRM -- 6/20/20
*
* Modifications:
*
* None.
*
*****************************************************************************/
static void
setup_rand(void)
{
hbool_t use_predefined_seed = FALSE;
unsigned predefined_seed = 18669;
unsigned seed;
struct timeval tv;
if ( use_predefined_seed ) {
seed = predefined_seed;
HDfprintf(stdout, "\n%s: predefined_seed = %d.\n\n", FUNC, seed);
HDfflush(stdout);
HDsrand(seed);
} else {
if ( HDgettimeofday(&tv, NULL) != 0 ) {
HDfprintf(stdout,
"\n%s: gettimeofday() failed -- srand() not called.\n\n",
FUNC);
HDfflush(stdout);
} else {
seed = (unsigned)tv.tv_usec;
HDfprintf(stdout, "\n%s: seed = %d.\n\n", FUNC, seed);
HDfflush(stdout);
HDsrand(seed);
}
}
return;
} /* setup_rand() */
/*-------------------------------------------------------------------------
* Function: test_vector_io__setup_v
*
* Purpose: Construct and initialize a vector of I/O requests used
* to test vector I/O. Note that while the vectors are
* allocated and initialized, they are not assigned
* base addresses.
*
* All arrays parameters are presumed to be of length
* count.
*
* Return: Return TRUE if sucessful, and FALSE if any errors
* are encountered.
*
* Programmer: John Mainzer
* 6/21/20
*
* Modifications:
*
* None.
*
*-------------------------------------------------------------------------
*/
static hbool_t
test_vector_io__setup_v(uint32_t count, H5FD_mem_t types[], haddr_t addrs[],
size_t sizes[], void * write_bufs[], void * read_bufs[],
char base_fill_char)
{
hbool_t result = TRUE; /* will set to FALSE on failure */
char fill_char = base_fill_char;
uint32_t i;
uint32_t j;
H5FD_mem_t mem_types[6] = {H5FD_MEM_SUPER, H5FD_MEM_BTREE, H5FD_MEM_DRAW,
H5FD_MEM_GHEAP, H5FD_MEM_LHEAP, H5FD_MEM_OHDR};
/* set the arrays of pointers to the write and read buffers to NULL,
* so that we can release memory on failure.
*/
for ( i = 0; i < count; i++ ) {
write_bufs[i] = NULL;
read_bufs[i] = NULL;
}
for ( i = 0; i < count; i++ ) {
types[i] = mem_types[i % 6];
addrs[i] = HADDR_UNDEF;
sizes[i] = (size_t)((rand() & 1023) + 1);
write_bufs[i] = HDmalloc(sizes[i] + 1);
read_bufs[i] = HDmalloc(sizes[i] + 1);
if ( ( NULL == write_bufs[i] ) || ( NULL == read_bufs[i] ) ) {
HDfprintf(stderr, "%s: can't malloc read / write bufs.\n", FUNC);
result = FALSE;
break;
}
for ( j = 0; j < sizes[i]; j++ ) {
((char *)(write_bufs[i]))[j] = fill_char;
((char *)(read_bufs[i]))[j] = '\0';
}
((char *)(write_bufs[i]))[sizes[i]] = '\0';
((char *)(read_bufs[i]))[sizes[i]] = '\0';
fill_char++;
}
if ( ! result ) { /* free buffers */
for ( i = 0; i < count; i++ ) {
if ( write_bufs[i] ) {
HDfree(write_bufs[i]);
write_bufs[i] = NULL;
}
if ( read_bufs[i] ) {
HDfree(read_bufs[i]);
read_bufs[i] = NULL;
}
}
}
return(result);
} /* end test_vector_io__setup_v() */
/*-------------------------------------------------------------------------
* Function: test_vector_io__read_v_indiv
*
* Purpose: Read the supplied vector as a sequence of individual
* reads.
*
* All arrays parameters are presumed to be of length
* count.
*
* Return: Return TRUE if sucessful, and FALSE if any errors
* are encountered.
*
* Programmer: John Mainzer
* 6/21/20
*
* Modifications:
*
* None.
*
*-------------------------------------------------------------------------
*/
static hbool_t
test_vector_io__read_v_indiv(H5FD_t * lf, uint32_t count, H5FD_mem_t types[],
haddr_t addrs[], size_t sizes[], void * read_bufs[])
{
hbool_t result = TRUE; /* will set to FALSE on failure */
hbool_t verbose = FALSE;
uint32_t i;
for ( i = 0; i < count; i++ ) {
if ( H5FDread(lf, types[i], H5P_DEFAULT, addrs[i], sizes[i],
read_bufs[i]) < 0 ) {
if ( verbose ) {
HDfprintf(stdout, "%s: HDread() failed on entry %d.\n",
FUNC, i);
}
result = FALSE;
break;
}
}
return(result);
} /* end test_vector_io__read_v_indiv() */
/*-------------------------------------------------------------------------
* Function: test_vector_io__write_v_indiv
*
* Purpose: Write the supplied vector as a sequence of individual
* writes.
*
* All arrays parameters are presumed to be of length
* count.
*
* Return: Return TRUE if sucessful, and FALSE if any errors
* are encountered.
*
* Programmer: John Mainzer
* 6/21/20
*
* Modifications:
*
* None.
*
*-------------------------------------------------------------------------
*/
static hbool_t
test_vector_io__write_v_indiv(H5FD_t * lf, uint32_t count, H5FD_mem_t types[],
haddr_t addrs[], size_t sizes[], void * write_bufs[])
{
hbool_t result = TRUE; /* will set to FALSE on failure */
hbool_t verbose = FALSE;
uint32_t i;
for ( i = 0; i < count; i++ ) {
if ( H5FDwrite(lf, types[i], H5P_DEFAULT, addrs[i], sizes[i],
write_bufs[i]) < 0 ) {
if ( verbose ) {
HDfprintf(stdout, "%s: HDwrite() failed on entry %d.\n",
FUNC, i);
}
result = FALSE;
break;
}
}
return(result);
} /* end test_vector_io__write_v_indiv() */
/*-------------------------------------------------------------------------
*
* Function: test_vector_io__verify_v
*
* Purpose: Verify that the read and write buffers of the supplied
* vectors are identical.
*
* Return: TRUE if the read and write vectors are identical, and
* FALSE otherwise.
*
* Programmer: John Mainzer
* 6/21/20
*
* Changes: None.
*
*-------------------------------------------------------------------------
*/
static hbool_t
test_vector_io__verify_v(uint32_t count, H5FD_mem_t types[],
size_t sizes[], void * write_bufs[], void * read_bufs[], const char * name)
{
hbool_t identical = TRUE;
hbool_t verbose = FALSE;
uint32_t i;
uint32_t j;
uint32_t buf_size;
char * w_buf;
char * r_buf;
const char * mem_type_names[7] = {"H5FD_MEM_DEFAULT", "H5FD_MEM_SUPER",
"H5FD_MEM_BTREE", "H5FD_MEM_DRAW",
"H5FD_MEM_GHEAP", "H5FD_MEM_LHEAP",
"H5FD_MEM_OHDR"};
i = 0;
while ( ( i < count ) && ( identical ) ) {
buf_size = (uint32_t)(sizes[i]);
w_buf = (char *)(write_bufs[i]);
r_buf = (char *)(read_bufs[i]);
j = 0;
while ( ( j < buf_size ) && ( identical ) ) {
if ( w_buf[j] != r_buf[j] ) {
identical = FALSE;
if ( verbose ) {
HDfprintf(stdout,
"\n\nread/write buf mismatch in vector/entry");
HDfprintf(stdout,
"\"%s\"/%d at offset %d/%d w/r = %c/%c type = %s\n\n",
name, i, j, buf_size, w_buf[j], r_buf[j],
mem_type_names[types[i]]);
}
}
j++;
}
i++;
}
return(identical);
} /* end test_vector_io__verify_v() */
/*-------------------------------------------------------------------------
*
* Function: test_vector_io__dump_test_vectors
*
* Purpose: Print a set of test vectors to stdout.
* Vectors are assumed to be of length count, and
* buffers must be either NULL, or null terminate strings
* of char.
*
* Return: void.
*
* Programmer: John Mainzer
* 6/21/20
*
* Changes: None.
*
*-------------------------------------------------------------------------
*/
static void
test_vector_io__dump_test_vectors(uint32_t count, H5FD_mem_t types[],
haddr_t addrs[], size_t sizes[], void * write_bufs[], void * read_bufs[],
const char * name)
{
uint32_t i;
const char * mem_type_names[7] = {"H5FD_MEM_DEFAULT", "H5FD_MEM_SUPER",
"H5FD_MEM_BTREE", "H5FD_MEM_DRAW",
"H5FD_MEM_GHEAP", "H5FD_MEM_LHEAP",
"H5FD_MEM_OHDR"};
char * w_buf;
char * r_buf;
HDfprintf(stdout, "\n\nDumping test vector \"%s\" of length %d\n\n",
name, count);
for ( i = 0; i < count; i++ ) {
HDassert((H5FD_MEM_DEFAULT <= types[i]) &&
(types[i] <= H5FD_MEM_OHDR));
w_buf = (char *)(write_bufs[i]);
if ( read_bufs ) {
r_buf = (char *)(read_bufs[i]);
} else {
r_buf = NULL;
}
HDfprintf(stdout,
"%d: addr/len = %lld/%lld, type = %s, w_buf = \"%s\"\n",
i, (long long)(addrs[i]), (long long)(sizes[i]),
mem_type_names[types[i]], w_buf);
if ( r_buf ) {
HDfprintf(stdout, " r_buf = \"%s\"\n", r_buf);
}
}
return;
} /* end test_vector_io__dump_test_vectors() */
/*-------------------------------------------------------------------------
* Function: test_vector_io
*
* Purpose: Test I/O using the vector I/O VFD public VFD calls.
*
* Test proceeds as follows:
*
* 1) read / write vectors and verify results
*
* 2) write individual / read vector and verify results
*
* 3) write vector / read individual and verify results
*
* 4) Close and then re-open the file, verify data written
* above.
*
* Return: Success: 0
* Failure: -1
*
* Programmer: John Mainzer
* 6/20/20
*
* Changes: None.
*
*-------------------------------------------------------------------------
*/
#define VECTOR_LEN 16
static herr_t
test_vector_io(const char * vfd_name)
{
char test_title[80];
hbool_t verbose = FALSE;
hid_t fapl_id = -1; /* file access property list ID */
haddr_t eoa; /* file eoa */
char filename[1024]; /* filename */
char * buf; /* tmp ptr to buf */
unsigned flags = 0; /* file open flags */
H5FD_t * lf; /* VFD struct ptr */
uint32_t i; /* index */
uint32_t j; /* index */
uint32_t count = VECTOR_LEN; /* length of vectors */
H5FD_mem_t types_0[VECTOR_LEN]; /* types vector */
H5FD_mem_t types_1[VECTOR_LEN]; /* types vector */
H5FD_mem_t types_2[VECTOR_LEN]; /* types vector */
haddr_t addrs_0[VECTOR_LEN]; /* addresses vector */
haddr_t addrs_1[VECTOR_LEN]; /* addresses vector */
haddr_t addrs_2[VECTOR_LEN]; /* addresses vector */
size_t sizes_0[VECTOR_LEN]; /* sizes vector */
size_t sizes_1[VECTOR_LEN]; /* sizes vector */
size_t sizes_2[VECTOR_LEN]; /* sizes vector */
void * write_bufs_0[VECTOR_LEN]; /* write bufs vector */
void * write_bufs_1[VECTOR_LEN]; /* write bufs vector */
void * write_bufs_2[VECTOR_LEN]; /* write bufs vector */
void * read_bufs_0[VECTOR_LEN]; /* read bufs vector */
void * read_bufs_1[VECTOR_LEN]; /* read bufs vector */
void * read_bufs_2[VECTOR_LEN]; /* read bufs vector */
sprintf(test_title, "vector I/O with %s VFD", vfd_name);
TESTING(test_title);
/* Set property list and file name for target driver */
if((fapl_id = H5Pcreate(H5P_FILE_ACCESS)) < 0)
TEST_ERROR;
if ( strcmp(vfd_name, "sec2") == 0 ) {
if(H5Pset_fapl_sec2(fapl_id) < 0)
TEST_ERROR;
h5_fixname(FILENAME[0], fapl_id, filename, sizeof(filename));
} else if ( strcmp(vfd_name, "stdio") == 0 ) {
if(H5Pset_fapl_stdio(fapl_id) < 0)
TEST_ERROR;
h5_fixname(FILENAME[7], fapl_id, filename, sizeof filename);
} else {
HDfprintf(stdout, "un-supported VFD\n");
TEST_ERROR
}
/* setup the test vectors -- note that addresses are not set until
* we allocate space via the file driver.
*/
if ( ! ( test_vector_io__setup_v(count, types_0, addrs_0, sizes_0,
write_bufs_0, read_bufs_0, 'a') &&
test_vector_io__setup_v(count, types_1, addrs_1, sizes_1,
write_bufs_1, read_bufs_1, 'e') &&
test_vector_io__setup_v(count, types_2, addrs_2, sizes_2,
write_bufs_2, read_bufs_2, 'A') ) )
TEST_ERROR;
flags = H5F_ACC_RDWR | H5F_ACC_CREAT | H5F_ACC_TRUNC;
if ( NULL == (lf = H5FDopen(filename, flags, fapl_id, HADDR_UNDEF)))
TEST_ERROR;
/* allocate space for the data in the test vectors */
for ( i = 0; i < count; i++ ) {
addrs_0[i] = H5FDalloc(lf, types_0[i], H5P_DEFAULT,
(hsize_t)(sizes_0[i]));
addrs_1[i] = H5FDalloc(lf, types_1[i], H5P_DEFAULT,
(hsize_t)(sizes_1[i]));
addrs_2[i] = H5FDalloc(lf, types_2[i], H5P_DEFAULT,
(hsize_t)(sizes_2[i]));
if ( ( addrs_0[i] == HADDR_UNDEF ) ||
( addrs_1[i] == HADDR_UNDEF ) ||
( addrs_2[i] == HADDR_UNDEF ) )
TEST_ERROR;
}
if ( verbose ) {
test_vector_io__dump_test_vectors(count, types_0, addrs_0, sizes_0,
write_bufs_0, NULL, "zero");
test_vector_io__dump_test_vectors(count, types_1, addrs_1, sizes_1,
write_bufs_1, NULL, "one");
test_vector_io__dump_test_vectors(count, types_2, addrs_2, sizes_2,
write_bufs_2, NULL, "two");
}
/* write and then read using vector I/O. First, read/write vector
* of length 1, then of length 2, then remainder of vector
*/
if ( H5FDwrite_vector(lf, H5P_DEFAULT, 1, &(types_0[0]), &(addrs_0[0]),
&(sizes_0[0]), &(write_bufs_0[0])) < 0 )
TEST_ERROR;
if ( H5FDread_vector(lf, H5P_DEFAULT, 1, &(types_0[0]), &(addrs_0[0]),
&(sizes_0[0]), &(read_bufs_0[0])) < 0 )
TEST_ERROR;
if ( H5FDwrite_vector(lf, H5P_DEFAULT, 2, &(types_0[1]), &(addrs_0[1]),
&(sizes_0[1]), &(write_bufs_0[1])) < 0 )
TEST_ERROR;
if ( H5FDread_vector(lf, H5P_DEFAULT, 2, &(types_0[1]), &(addrs_0[1]),
&(sizes_0[1]), &(read_bufs_0[1])) < 0 )
TEST_ERROR;
if ( H5FDwrite_vector(lf, H5P_DEFAULT, count - 3, &(types_0[3]),
&(addrs_0[3]), &(sizes_0[3]), &(write_bufs_0[3])) < 0 )
TEST_ERROR;
if ( H5FDread_vector(lf, H5P_DEFAULT, count - 3, &(types_0[3]),
&(addrs_0[3]), &(sizes_0[3]), &(read_bufs_0[3])) < 0 )
TEST_ERROR;
/* verify that the expected data is read */
if ( ! test_vector_io__verify_v(count, types_0, sizes_0,
write_bufs_0, read_bufs_0, "zero") )
TEST_ERROR;
/* write the contents of a vector individually, and then read it back
* in several vector reads.
*/
if ( ! test_vector_io__write_v_indiv(lf, count, types_1, addrs_1,
sizes_1, write_bufs_1) )
TEST_ERROR;
if ( H5FDread_vector(lf, H5P_DEFAULT, 1, &(types_1[0]), &(addrs_1[0]),
&(sizes_1[0]), &(read_bufs_1[0])) < 0 )
TEST_ERROR;
if ( H5FDread_vector(lf, H5P_DEFAULT, 2, &(types_1[1]), &(addrs_1[1]),
&(sizes_1[1]), &(read_bufs_1[1])) < 0 )
TEST_ERROR;
if ( H5FDread_vector(lf, H5P_DEFAULT, count - 3, &(types_1[3]),
&(addrs_1[3]), &(sizes_1[3]), &(read_bufs_1[3])) < 0 )
TEST_ERROR;
/* verify that the expected data is read */
if ( ! test_vector_io__verify_v(count, types_1, sizes_1,
write_bufs_1, read_bufs_1, "one") )
TEST_ERROR;
/* Write the contents of a vector as several vector writes, then
* read it back in idividual reads.
*/
if ( H5FDwrite_vector(lf, H5P_DEFAULT, 1, &(types_2[0]), &(addrs_2[0]),
&(sizes_2[0]), &(write_bufs_2[0])) < 0 )
TEST_ERROR;
if ( H5FDwrite_vector(lf, H5P_DEFAULT, 2, &(types_2[1]), &(addrs_2[1]),
&(sizes_2[1]), &(write_bufs_2[1])) < 0 )
TEST_ERROR;
if ( H5FDwrite_vector(lf, H5P_DEFAULT, count - 3, &(types_2[3]),
&(addrs_2[3]), &(sizes_2[3]), &(write_bufs_2[3])) < 0 )
TEST_ERROR;
if ( ! test_vector_io__read_v_indiv(lf, count, types_2, addrs_2,
sizes_2, read_bufs_2) )
TEST_ERROR;
/* verify that the expected data is read */
if ( ! test_vector_io__verify_v(count, types_2, sizes_2,
write_bufs_2, read_bufs_2, "two") )
TEST_ERROR;
/* make note of eoa -- needed after we re-open the file */
if ( HADDR_UNDEF == (eoa = H5FDget_eoa(lf, H5FD_MEM_DEFAULT)))
TEST_ERROR;
/* close the file and then re-open it */
if ( H5FDclose(lf) < 0 )
TEST_ERROR;
flags = H5F_ACC_RDWR ;
if ( NULL == (lf = H5FDopen(filename, flags, fapl_id, HADDR_UNDEF)))
TEST_ERROR;
/* The EOA is set to 0 on open. To avoid errors, we must set it
* to its correct value before we do any reads.
*
* Note: In the context of using the VFD layer without the HDF5
* library on top, this doesn't make much sense. Consider
* adding an open flag that sets the EOA to the current file
* size.
*/
if ( H5FDset_eoa(lf, H5FD_MEM_DEFAULT, eoa) < 0 )
TEST_ERROR;
/* Null the read vectors */
for ( i = 0 ; i < count; i++ ) {
buf = read_bufs_0[i];
for ( j = 0; j < sizes_0[i]; j++ ) {
buf[j] = '\0';
}
buf = read_bufs_1[i];
for ( j = 0; j < sizes_1[i]; j++ ) {
buf[j] = '\0';
}
buf = read_bufs_2[i];
for ( j = 0; j < sizes_2[i]; j++ ) {
buf[j] = '\0';
}
}
/* read the contents of the file */
if ( H5FDread_vector(lf, H5P_DEFAULT, count, types_0,
addrs_0, sizes_0, read_bufs_0) < 0 )
TEST_ERROR;
if ( H5FDread_vector(lf, H5P_DEFAULT, count, types_1,
addrs_1, sizes_1, read_bufs_1) < 0 )
TEST_ERROR;
if ( H5FDread_vector(lf, H5P_DEFAULT, count, types_2,
addrs_2, sizes_2, read_bufs_2) < 0 )
TEST_ERROR;
/* verify the contents. */
if ( ! test_vector_io__verify_v(count, types_0, sizes_0,
write_bufs_0, read_bufs_0, "zero-") )
TEST_ERROR;
if ( ! test_vector_io__verify_v(count, types_1, sizes_1,
write_bufs_1, read_bufs_1, "one-") )
TEST_ERROR;
if ( ! test_vector_io__verify_v(count, types_2, sizes_2,
write_bufs_2, read_bufs_2, "two-") )
TEST_ERROR;
if ( H5FDclose(lf) < 0 )
TEST_ERROR;
h5_delete_test_file(FILENAME[0], fapl_id);
/* Close the fapl */
if(H5Pclose(fapl_id) < 0)
TEST_ERROR;
/* discard the read and write buffers */
for ( i = 0; i < count; i++ ) {
HDfree(write_bufs_0[i]);
write_bufs_0[i] = NULL;
HDfree(write_bufs_1[i]);
write_bufs_1[i] = NULL;
HDfree(write_bufs_2[i]);
write_bufs_2[i] = NULL;
HDfree(read_bufs_0[i]);
read_bufs_0[i] = NULL;
HDfree(read_bufs_1[i]);
read_bufs_1[i] = NULL;
HDfree(read_bufs_2[i]);
read_bufs_2[i] = NULL;
}
PASSED();
return 0;
error:
H5E_BEGIN_TRY {
H5Pclose(fapl_id);
H5FDclose(lf);
} H5E_END_TRY;
return -1;
} /* end test_vector_io() */
/*-------------------------------------------------------------------------
* Function: test_subfiling
*
* Purpose: Tests the file handle interface for subfiling driver
*
* Richard:
*
* This test is serial only -- I'm including it
* because I used the sec2 VFD as the base of the skeletal
* sub-filing VFD. Needless to say, sub-filing proper will
* be parallel only, which implies that the associated test
* code will be in testpar.
*
* That said, we will eventually need to be able to do
* sub-file I/O in serial. Also, it may be appropriate to
* test subfiling property lists in the serial code.
*
* Thus you may want to keep this function in stub form once
* modify the subfiling skeleton, and move the test code
* to testpar.
* -- John
*
* Return: Success: 0
* Failure: -1
*
* Programmer: <prgrammer>
* <date>
*
*-------------------------------------------------------------------------
*/
static herr_t
test_subfiling(void)
{
hid_t fid = -1; /* file ID */
hid_t fapl_id = -1; /* file access property list ID */
hid_t fapl_id_out = -1; /* from H5Fget_access_plist */
hid_t driver_id = -1; /* ID for this VFD */
unsigned long driver_flags = 0; /* VFD feature flags */
char filename[1024]; /* filename */
void *os_file_handle = NULL; /* OS file handle */
hsize_t file_size; /* file size */
H5FD_subfiling_fapl_t fa_in = {H5FD_CURR_SUBFILING_FAPL_T_VERSION};
H5FD_subfiling_fapl_t fa_out;
TESTING("subfiling file driver");
/* Set property list and file name for subfiling driver. */
if((fapl_id = H5Pcreate(H5P_FILE_ACCESS)) < 0)
TEST_ERROR;
if(H5Pset_fapl_subfiling(fapl_id, &fa_in) < 0)
TEST_ERROR;
/* get and verify the H5FD_subfiling_fapl_t */
if(H5Pget_fapl_subfiling(fapl_id, &fa_out) < 0)
TEST_ERROR;
if(fa_out.version != H5FD_CURR_SUBFILING_FAPL_T_VERSION)
TEST_ERROR;
h5_fixname(FILENAME[0], fapl_id, filename, sizeof(filename));
/* Check that the VFD feature flags are correct */
if ((driver_id = H5Pget_driver(fapl_id)) < 0)
TEST_ERROR
if (H5FDdriver_query(driver_id, &driver_flags) < 0)
TEST_ERROR
if(!(driver_flags & H5FD_FEAT_AGGREGATE_METADATA)) TEST_ERROR
if(!(driver_flags & H5FD_FEAT_ACCUMULATE_METADATA)) TEST_ERROR
if(!(driver_flags & H5FD_FEAT_DATA_SIEVE)) TEST_ERROR
if(!(driver_flags & H5FD_FEAT_AGGREGATE_SMALLDATA)) TEST_ERROR
if(!(driver_flags & H5FD_FEAT_POSIX_COMPAT_HANDLE)) TEST_ERROR
if(!(driver_flags & H5FD_FEAT_SUPPORTS_SWMR_IO)) TEST_ERROR
if(!(driver_flags & H5FD_FEAT_DEFAULT_VFD_COMPATIBLE)) TEST_ERROR
/* Check for extra flags not accounted for above */
if(driver_flags != (H5FD_FEAT_AGGREGATE_METADATA
| H5FD_FEAT_ACCUMULATE_METADATA
| H5FD_FEAT_DATA_SIEVE
| H5FD_FEAT_AGGREGATE_SMALLDATA
| H5FD_FEAT_POSIX_COMPAT_HANDLE
| H5FD_FEAT_SUPPORTS_SWMR_IO
| H5FD_FEAT_DEFAULT_VFD_COMPATIBLE))
TEST_ERROR
if((fid = H5Fcreate(filename, H5F_ACC_TRUNC, H5P_DEFAULT, fapl_id)) < 0)
TEST_ERROR;
/* Retrieve the access property list... */
if((fapl_id_out = H5Fget_access_plist(fid)) < 0)
TEST_ERROR;
/* Check that the driver is correct */
if(H5FD_SUBFILING != H5Pget_driver(fapl_id_out))
TEST_ERROR;
/* get and verify the H5FD_subfiling_fapl_t again */
if(H5Pget_fapl_subfiling(fapl_id_out, &fa_out) < 0)
TEST_ERROR;
if(fa_out.version != H5FD_CURR_SUBFILING_FAPL_T_VERSION)
TEST_ERROR;
/* ...and close the property list */
if(H5Pclose(fapl_id_out) < 0)
TEST_ERROR;
/* Check that we can get an operating-system-specific handle from
* the library.
*
* Not sure that this will be meaningful in the subfiling case.
*/
if(H5Fget_vfd_handle(fid, H5P_DEFAULT, &os_file_handle) < 0)
TEST_ERROR;
if(os_file_handle == NULL)
FAIL_PUTS_ERROR("NULL os-specific vfd/file handle was returned from H5Fget_vfd_handle");
/* There is no garantee the size of metadata in file is constant.
* Just try to check if it's reasonable.
*
* Currently it should be around 2 KB.
*/
if(H5Fget_filesize(fid, &file_size) < 0)
TEST_ERROR;
if(file_size < 1 * KB || file_size > 4 * KB)
FAIL_PUTS_ERROR("suspicious file size obtained from H5Fget_filesize");
/* Close and delete the file */
if(H5Fclose(fid) < 0)
TEST_ERROR;
h5_delete_test_file(FILENAME[0], fapl_id);
/* Close the fapl */
if(H5Pclose(fapl_id) < 0)
TEST_ERROR;
PASSED();
return 0;
error:
H5E_BEGIN_TRY {
H5Pclose(fapl_id);
H5Pclose(fapl_id_out);
H5Fclose(fid);
} H5E_END_TRY;
return -1;
} /* end test_subfiling() */
/*-------------------------------------------------------------------------
* Function: main
*
@@ -3288,19 +4176,25 @@ main(void)
HDprintf("Testing basic Virtual File Driver functionality.\n");
nerrors += test_sec2() < 0 ? 1 : 0;
nerrors += test_core() < 0 ? 1 : 0;
nerrors += test_direct() < 0 ? 1 : 0;
nerrors += test_family() < 0 ? 1 : 0;
nerrors += test_family_compat() < 0 ? 1 : 0;
nerrors += test_family_member_fapl() < 0 ? 1 : 0;
nerrors += test_multi() < 0 ? 1 : 0;
nerrors += test_multi_compat() < 0 ? 1 : 0;
nerrors += test_log() < 0 ? 1 : 0;
nerrors += test_stdio() < 0 ? 1 : 0;
nerrors += test_windows() < 0 ? 1 : 0;
nerrors += test_ros3() < 0 ? 1 : 0;
nerrors += test_splitter() < 0 ? 1 : 0;
setup_rand();
nerrors += test_sec2() < 0 ? 1 : 0;
nerrors += test_core() < 0 ? 1 : 0;
nerrors += test_direct() < 0 ? 1 : 0;
nerrors += test_family() < 0 ? 1 : 0;
nerrors += test_family_compat() < 0 ? 1 : 0;
nerrors += test_family_member_fapl() < 0 ? 1 : 0;
nerrors += test_multi() < 0 ? 1 : 0;
nerrors += test_multi_compat() < 0 ? 1 : 0;
nerrors += test_log() < 0 ? 1 : 0;
nerrors += test_stdio() < 0 ? 1 : 0;
nerrors += test_windows() < 0 ? 1 : 0;
nerrors += test_ros3() < 0 ? 1 : 0;
nerrors += test_splitter() < 0 ? 1 : 0;
nerrors += test_vector_io("sec2") < 0 ? 1 : 0;
nerrors += test_vector_io("stdio") < 0 ? 1 : 0;
nerrors += test_subfiling() < 0 ? 1 : 0;
if(nerrors) {
HDprintf("***** %d Virtual File Driver TEST%s FAILED! *****\n",

View File

@@ -75,6 +75,8 @@ set (H5P_TESTS
t_shapesame
t_filters_parallel
t_2Gio
t_subfile_openclose
t_subfile_readwrite
)
foreach (h5_testp ${H5P_TESTS})

View File

@@ -30,7 +30,8 @@ check_SCRIPTS = $(TEST_SCRIPT_PARA)
# Test programs. These are our main targets.
#
TEST_PROG_PARA=t_mpi t_bigio testphdf5 t_cache t_cache_image t_pread t_pshutdown t_prestart t_init_term t_shapesame t_filters_parallel t_2Gio
TEST_PROG_PARA=t_mpi t_bigio testphdf5 t_cache t_cache_image t_pread t_pshutdown t_prestart t_init_term t_shapesame t_filters_parallel t_2Gio \
t_subfile_openclose t_subfile_readwrite
# t_pflush1 and t_pflush2 are used by testpflush.sh
check_PROGRAMS = $(TEST_PROG_PARA) t_pflush1 t_pflush2

View File

@@ -0,0 +1,50 @@
#include <stdio.h>
#include "hdf5.h"
#include "H5FDsubfile_public.h"
#include "mpi.h"
int
main(int argc, char **argv)
{
int i, mpi_size, mpi_rank;
int loop_count = 20;
int mpi_provides, require = MPI_THREAD_MULTIPLE;
hid_t subfile_id = -1;
MPI_Init_thread(&argc, &argv, require, &mpi_provides);
MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
if (argc > 1) {
int check_value = atoi(argv[1]);
if (check_value > 0) {
loop_count = check_value;
}
}
H5open();
if (H5FDsubfiling_init() == SUCCEED) {
subfile_id = get_subfiling_context();
printf("[%d] subfile_id = %lx\n", mpi_rank, subfile_id);
}
else if (mpi_rank == 0) {
puts("Error: Unable to initialize subfiling!");
}
for(i=0; i < loop_count; i++) {
sf_open_subfiles(subfile_id, NULL, O_CREAT|O_TRUNC|O_RDWR);
sf_close_subfiles(subfile_id);
}
H5FDsubfiling_finalize();
MPI_Barrier(MPI_COMM_WORLD);
H5close();
MPI_Finalize();
return 0;
}

View File

@@ -0,0 +1,170 @@
#include <stdio.h>
#include "hdf5.h"
#include "H5FDsubfile_public.h"
#include "mpi.h"
/* Elements per rank */
#define DATA_SIZE 100000
#define WRITE_OP 1
#define READ_OP 2
int mpi_size = -1;
int mpi_rank = -1;
static int test_subfile_op(int op_type, hid_t subfile_id, int64_t offset, int64_t local_elements, void *local_data)
{
int i, flags = O_RDWR;
int errors = 0;
int loop_count = 20;
int64_t local_data_size = local_elements * sizeof(int);
int64_t total_data_size = 0;
int (*subfile_ftn)(hid_t context_id,int64_t offset, int64_t elements, int dtype_extent, void *data) = sf_read_independent;
double m_startTime, m_endTime;
double this_time, max_time, min_time, total_time, avg_time;
double bw;
const char *OPERATION = "READ";
if (op_type == WRITE_OP) {
flags = O_CREAT|O_TRUNC|O_RDWR;
subfile_ftn = sf_write_independent;
OPERATION = "WRITE";
}
for(i=0; i < loop_count; i++) {
m_startTime = MPI_Wtime();
if (sf_open_subfiles(subfile_id, NULL, flags) < 0) {
puts("sf_open_subfiles returned an error!");
errors++;
goto done;
}
if (subfile_ftn(subfile_id, offset, local_elements, sizeof(int), local_data) < 0) {
puts("subfile_ftn returned an error!");
errors++;
goto done;
}
if (sf_close_subfiles(subfile_id) < 0) {
puts("sf_close_subfiles returned an error!");
errors++;
goto done;
}
m_endTime = MPI_Wtime();
this_time = m_endTime - m_startTime;
if (i == 0) {
min_time = this_time;
max_time = this_time;
}
else if (this_time < min_time) {
min_time = this_time;
}
if (this_time > max_time) {
max_time = this_time;
}
total_time += this_time;
}
total_data_size = local_data_size * mpi_size;
avg_time = total_time / (double) loop_count;
bw = ((double)total_data_size)/ avg_time / (1024.0 * 1024.0);
if (mpi_rank == 0) {
printf("%s Perf: %lf BW/[MBs] %ld Bytes AvgTime[sec] %lf\n", OPERATION, bw, total_data_size, avg_time);
fflush(stdout);
}
done:
return errors;
}
int
main(int argc, char **argv)
{
int errors = 0;
int mpi_provides, require = MPI_THREAD_MULTIPLE;
hid_t subfile_id = -1;
double m_startTime, m_endTime;
double this_time, max_time, min_time, total_time, avg_time;
double bw;
int64_t local_elements = DATA_SIZE;
int64_t local_data_size = 0;
int64_t offset = 0;
int *local_data = NULL;
int *verify_data = NULL;
MPI_Init_thread(&argc, &argv, require, &mpi_provides);
MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
if (argc > 1) {
int64_t check_value = atoll(argv[1]);
if (check_value > 0) {
local_elements = check_value;
}
}
H5open();
local_data_size = local_elements * sizeof(int);
local_data = (int *)malloc((size_t)local_data_size);
if (local_data) {
int k, base = local_elements * mpi_rank;
offset = local_data_size * mpi_rank;
for (k=0; k < local_elements; k++) {
local_data[k] = k + base;
}
}
else {
perror("malloc failure");
errors++;
goto done;
}
verify_data = (int *)malloc((size_t)local_data_size);
if (verify_data == NULL) {
perror("malloc failure");
errors++;
goto done;
}
if (H5FDsubfiling_init() == SUCCEED) {
subfile_id = get_subfiling_context();
printf("[%d] subfile_id = %lx\n", mpi_rank, subfile_id);
}
else if (mpi_rank == 0) {
puts("Error: Unable to initialize subfiling!");
errors++;
goto done;
}
if (test_subfile_op( WRITE_OP, subfile_id, offset, local_elements, local_data)) {
puts("Subfile writing test returned an error!");
errors++;
goto done;
}
if (test_subfile_op( READ_OP, subfile_id, offset, local_elements, verify_data)) {
puts("Subfile reading test returned an error!");
errors++;
goto done;
}
done:
H5FDsubfiling_finalize();
MPI_Barrier(MPI_COMM_WORLD);
if (local_data) {
free(local_data);
local_data = NULL;
}
if (verify_data) {
free(verify_data);
verify_data = NULL;
}
H5close();
MPI_Finalize();
return 0;
}

View File

@@ -899,7 +899,7 @@ h5diff(const char *fname1, const char *fname2, const char *objname1, const char
H5TOOLS_DEBUG("groups traversed - errstat:%d", opts->err_stat);
#ifdef H5_HAVE_PARALLEL
if(g_Parallel) {
if(g_Parallel && !g_CollectInfoOnly) {
int i;
if((HDstrlen(fname1) > MAX_FILENAME) || (HDstrlen(fname2) > MAX_FILENAME)) {
@@ -914,6 +914,11 @@ h5diff(const char *fname1, const char *fname2, const char *objname1, const char
for(i = 1; i < g_nTasks; i++)
MPI_Send(filenames, (MAX_FILENAME * 2), MPI_CHAR, i, MPI_TAG_PARALLEL, MPI_COMM_WORLD);
} /* end if */
else if (g_CollectInfoOnly) {
build_match_list (obj1fullname, info1_lp, obj2fullname, info2_lp, &match_list, opts);
}
#endif
H5TOOLS_DEBUG("build_match_list next - errstat:%d", opts->err_stat);

View File

@@ -48,6 +48,7 @@ hsize_t H5TOOLS_BUFSIZE = ( 32 * 1024 * 1024); /* 32 MB */
/* ``parallel_print'' variables */
unsigned char g_Parallel = 0; /*0 for serial, 1 for parallel */
unsigned char g_CollectInfoOnly = 0;
char outBuff[OUTBUFF_SIZE];
unsigned outBuffOffset;
FILE* overflow_file = NULL;

View File

@@ -32,6 +32,7 @@ extern "C" {
H5TOOLS_DLLVAR int g_nTasks;
H5TOOLS_DLLVAR unsigned char g_Parallel;
H5TOOLS_DLLVAR unsigned char g_CollectInfoOnly;
H5TOOLS_DLLVAR char outBuff[];
H5TOOLS_DLLVAR unsigned outBuffOffset;
H5TOOLS_DLLVAR FILE *overflow_file;

View File

@@ -15,6 +15,9 @@
#include "h5trav.h"
#include "h5tools.h"
#include "H5private.h"
#ifdef H5_HAVE_PARALLEL
#include "h5tools_utils.h"
#endif
/*-------------------------------------------------------------------------
* local typedefs
@@ -179,8 +182,10 @@ static herr_t
traverse_cb(hid_t loc_id, const char *path, const H5L_info2_t *linfo,
void *_udata)
{
herr_t ret_value = SUCCEED;
trav_ud_traverse_t *udata = (trav_ud_traverse_t *)_udata; /* User data */
char *new_name = NULL;
const char *full_name;
const char *already_visited = NULL; /* Whether the link/object was already visited */
@@ -201,6 +206,18 @@ traverse_cb(hid_t loc_id, const char *path, const H5L_info2_t *linfo,
else
full_name = path;
#ifdef H5_HAVE_PARALLEL
if(linfo->type == H5L_TYPE_EXTERNAL) {
h5tool_link_info_t lnk_info;
if ((ret_value = H5tools_get_symlink_info(loc_id, path, &lnk_info, FALSE)) < 0) {
puts("H5tools_get_symlink_info failed!");
}
else if (ret_value == 0) {
puts("Dangling link?");
}
printf("Visiting external link: %s\n", path);
}
#endif
/* Perform the correct action for different types of links */
if(linfo->type == H5L_TYPE_HARD) {
H5O_info2_t oinfo;