Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apps/example/Makefile.in
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ PS=@PSIO@
NETMAP=@NETMAP@
ONVM=@ONVM@
CCP=@CCP@
UCTX=@UCTX@
CFLAGS=@CFLAGS@

# Add arch-specific optimization
Expand Down Expand Up @@ -76,6 +77,10 @@ LIBS += -L$(LIBCCP) -lccp -lstartccp
INC += -I$(LIBCCP)
endif

ifeq ($(UCTX),1)
CFLAGS += -DENABLE_UCTX
endif

all: epserver epwget

epserver.o: epserver.c
Expand Down
45 changes: 33 additions & 12 deletions apps/example/epserver.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ struct server_vars
struct thread_context
{
mctx_t mctx;
int core;
int ep;
struct server_vars *svars;
};
Expand Down Expand Up @@ -361,6 +362,8 @@ InitializeServerThread(int core)
return NULL;
}

ctx->core = core;

return ctx;
}
/*----------------------------------------------------------------------------*/
Expand Down Expand Up @@ -410,10 +413,10 @@ CreateListeningSocket(struct thread_context *ctx)
return listener;
}
/*----------------------------------------------------------------------------*/
void *
RunServerThread(void *arg)
void
RunServerContext(void *arg)
{
int core = *(int *)arg;
int core;
struct thread_context *ctx;
mctx_t mctx;
int listener;
Expand All @@ -422,16 +425,11 @@ RunServerThread(void *arg)
int nevents;
int i, ret;
int do_accept;

/* initialization */
ctx = InitializeServerThread(core);
if (!ctx) {
TRACE_ERROR("Failed to initialize server thread.\n");
return NULL;
}
mctx = ctx->mctx;
ep = ctx->ep;

ctx = (struct thread_context *) arg;
ep = ctx->ep;
mctx = ctx->mctx;
core = ctx->core;
events = (struct mtcp_epoll_event *)
calloc(MAX_EVENTS, sizeof(struct mtcp_epoll_event));
if (!events) {
Expand Down Expand Up @@ -519,7 +517,30 @@ RunServerThread(void *arg)
}

}
}
/*----------------------------------------------------------------------------*/
void *
RunServerThread(void *arg)
{
int core = *(int *)arg;
struct thread_context *ctx;
mctx_t mctx;

/* initialization */
ctx = InitializeServerThread(core);
if (!ctx) {
TRACE_ERROR("Failed to initialize server thread.\n");
return NULL;
}
mctx = ctx->mctx;

#ifdef ENABLE_UCTX
mtcp_create_app_context(mctx, (mtcp_app_func_t) RunServerContext, (void *) ctx);
mtcp_run_app();
#else
RunServerContext(ctx);
#endif

/* destroy mtcp context: this will kill the mtcp thread */
mtcp_destroy_context(mctx);
pthread_exit(NULL);
Expand Down
131 changes: 79 additions & 52 deletions apps/example/epwget.c
Original file line number Diff line number Diff line change
Expand Up @@ -533,73 +533,35 @@ PrintStats()
#endif
}
/*----------------------------------------------------------------------------*/
void *
RunWgetMain(void *arg)
void
RunWgetAppMain(void *arg)
{
thread_context_t ctx;
mctx_t mctx;
int core = *(int *)arg;
struct in_addr daddr_in;
int n, maxevents;
int core;
int nevents;
int maxevents;
int ep;
struct mtcp_epoll_event *events;
int nevents;
struct wget_vars *wvars;
int i;

struct timeval cur_tv, prev_tv;
//uint64_t cur_ts, prev_ts;

mtcp_core_affinitize(core);

ctx = CreateContext(core);
if (!ctx) {
return NULL;
}
mctx = ctx->mctx;
g_ctx[core] = ctx;
g_stat[core] = &ctx->stat;
srand(time(NULL));

mtcp_init_rss(mctx, saddr, IP_RANGE, daddr, dport);

n = flows[core];
if (n == 0) {
TRACE_DBG("Application thread %d finished.\n", core);
pthread_exit(NULL);
return NULL;
}
ctx->target = n;

daddr_in.s_addr = daddr;
fprintf(stderr, "Thread %d handles %d flows. connecting to %s:%u\n",
core, n, inet_ntoa(daddr_in), ntohs(dport));

/* Initialization */

ctx = (thread_context_t) arg;
mctx = ctx->mctx;
core = ctx->core;
ep = ctx->ep;
wvars = ctx->wvars;
maxevents = max_fds * 3;
ep = mtcp_epoll_create(mctx, maxevents);
if (ep < 0) {
TRACE_ERROR("Failed to create epoll struct!n");
exit(EXIT_FAILURE);
}

events = (struct mtcp_epoll_event *)
calloc(maxevents, sizeof(struct mtcp_epoll_event));
if (!events) {
TRACE_ERROR("Failed to allocate events!\n");
exit(EXIT_FAILURE);
}
ctx->ep = ep;

wvars = (struct wget_vars *)calloc(max_fds, sizeof(struct wget_vars));
if (!wvars) {
TRACE_ERROR("Failed to create wget variables!\n");
exit(EXIT_FAILURE);
}
ctx->wvars = wvars;

ctx->started = ctx->done = ctx->pending = 0;
ctx->errors = ctx->incompletes = 0;


gettimeofday(&cur_tv, NULL);
//prev_ts = TIMEVAL_TO_USEC(cur_tv);
prev_tv = cur_tv;
Expand All @@ -620,7 +582,7 @@ RunWgetMain(void *arg)
break;
}
}

nevents = mtcp_epoll_wait(mctx, ep, events, maxevents, -1);
ctx->stat.waits++;

Expand Down Expand Up @@ -679,6 +641,71 @@ RunWgetMain(void *arg)
}
}

}

/*----------------------------------------------------------------------------*/
void *
RunWgetMain(void *arg)
{
thread_context_t ctx;
mctx_t mctx;
int core = *(int *)arg;
struct in_addr daddr_in;
int n, maxevents;
int ep;
struct wget_vars *wvars;

mtcp_core_affinitize(core);

ctx = CreateContext(core);
if (!ctx) {
return NULL;
}
mctx = ctx->mctx;
g_ctx[core] = ctx;
g_stat[core] = &ctx->stat;
srand(time(NULL));

mtcp_init_rss(mctx, saddr, IP_RANGE, daddr, dport);

n = flows[core];
if (n == 0) {
TRACE_DBG("Application thread %d finished.\n", core);
pthread_exit(NULL);
return NULL;
}
ctx->target = n;

daddr_in.s_addr = daddr;
fprintf(stderr, "Thread %d handles %d flows. connecting to %s:%u\n",
core, n, inet_ntoa(daddr_in), ntohs(dport));

/* Initialization */
maxevents = max_fds * 3;
ep = mtcp_epoll_create(mctx, maxevents);
if (ep < 0) {
TRACE_ERROR("Failed to create epoll struct!n");
exit(EXIT_FAILURE);
}
ctx->ep = ep;

wvars = (struct wget_vars *)calloc(max_fds, sizeof(struct wget_vars));
if (!wvars) {
TRACE_ERROR("Failed to create wget variables!\n");
exit(EXIT_FAILURE);
}
ctx->wvars = wvars;

ctx->started = ctx->done = ctx->pending = 0;
ctx->errors = ctx->incompletes = 0;

#ifdef ENABLE_UCTX
mtcp_create_app_context(mctx, (mtcp_app_func_t) RunWgetAppMain, (void *) ctx);
mtcp_run_app();
#else
RunWgetAppMain(ctx);
#endif

TRACE_INFO("Wget thread %d waiting for mtcp to be destroyed.\n", core);
DestroyContext(ctx);

Expand Down
13 changes: 13 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ AC_SUBST(ONVM, 0)
AC_SUBST(NETMAP, 0)
# Reset HWCSUM to 1
AC_SUBST(HWCSUM, 1)
# Reset UCTX to 0
AC_SUBST(UCTX, 0)

# Check dpdk-17.08 path (lib & inc)
AC_ARG_WITH(stuff, [ --with-dpdk-lib path to the dpdk-17.08 install root])
Expand Down Expand Up @@ -124,6 +126,17 @@ then
AC_SUBST(HWCSUM, 0)
fi

AC_ARG_ENABLE([uctx],
AS_HELP_STRING([--enable-uctx], [Enable user-level threading]))

if test "$with_dpdk_lib" != ""
then
if test "x$enable_uctx" = "xyes"
then
AC_SUBST(UCTX, 1)
fi
fi

# Check psio path (lib & inc)
AC_ARG_WITH(stuff, [ --with-psio-lib path to the ioengine install root])
if test "$with_psio_lib" != ""
Expand Down
10 changes: 10 additions & 0 deletions mtcp/src/Makefile.in
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ NETMAP=@NETMAP@
ONVM=@ONVM@
LRO=@LRO@
CCP=@CCP@
UCTX=@UCTX@
HWCSUM=@HWCSUM@
MTCP_LIB_DIR=../lib
MTCP_LIB=libmtcp.a
Expand Down Expand Up @@ -125,6 +126,15 @@ ifeq ($(CCP), 1)
SRCS += ccp.c
endif

# user-level threading with intel_lthread (experimental)
ifeq ($(UCTX), 1)
GCC_OPT += -DENABLE_UCTX
SRCS += intel_lthread/lthread.c intel_lthread/lthread_cond.c intel_lthread/lthread_diag.c \
intel_lthread/lthread_mutex.c intel_lthread/lthread_sched.c intel_lthread/lthread_tls.c \
intel_lthread/arch/x86/ctx.c schedule.c
INC += -I./intel_lthread/ -I./intel_lthread/arch/x86/
endif

OBJS = $(patsubst %.c,%.o,$(SRCS))
DEPS = $(patsubst %.c,.%.d,$(SRCS))

Expand Down
20 changes: 16 additions & 4 deletions mtcp/src/addr_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ CreateAddressPool(in_addr_t addr_base, int num_addr)
return NULL;
}

#ifndef ENABLE_UCTX
pthread_mutex_lock(&ap->lock);
#endif

ap->addr_base = ntohl(addr_base);
ap->num_addr = num_addr;
Expand All @@ -95,9 +97,11 @@ CreateAddressPool(in_addr_t addr_base, int num_addr)
ap->num_entry = cnt;
ap->num_free = cnt;
ap->num_used = 0;

pthread_mutex_unlock(&ap->lock);

#ifndef ENABLE_UCTX
pthread_mutex_unlock(&ap->lock);
#endif

return ap;
}
/*----------------------------------------------------------------------------*/
Expand Down Expand Up @@ -148,7 +152,9 @@ CreateAddressPoolPerCore(int core, int num_queues,
return NULL;
}

#ifndef ENABLE_UCTX
pthread_mutex_lock(&ap->lock);
#endif

ap->addr_base = ntohl(saddr_base);
ap->num_addr = num_addr;
Expand Down Expand Up @@ -186,8 +192,10 @@ CreateAddressPoolPerCore(int core, int num_queues,
" the max concurrency (%d).\n",
ap->num_entry, CONFIG.max_concurrency);
}


#ifndef ENABLE_UCTX
pthread_mutex_unlock(&ap->lock);
#endif

return ap;
}
Expand Down Expand Up @@ -282,7 +290,9 @@ FetchAddressPerCore(addr_pool_t ap, int core, int num_queues,
if (!ap || !daddr || !saddr)
return -1;

#ifndef ENABLE_UCTX
pthread_mutex_lock(&ap->lock);
#endif

/* we don't need to calculate RSSCPUCore if mtcp_init_rss is called */
walk = TAILQ_FIRST(&ap->free_list);
Expand All @@ -294,8 +304,10 @@ FetchAddressPerCore(addr_pool_t ap, int core, int num_queues,
ap->num_used++;
ret = 0;
}


#ifndef ENABLE_UCTX
pthread_mutex_unlock(&ap->lock);
#endif

return ret;
}
Expand Down
Loading